Added support to connect and perform CRUD operations with couchbase  (#3138)

* Implemented Couchbase binary protocol support

* added support for single connection type for couchbase

* removed unnecessary cout statements

* added protocol code for helo packet

* fixed vbucketID code for identification, fixed add and get functions

* Added test cases for threaded get and add functions

* Added Error Handling code and made upsert and delete examples

* added makefile for example/couchbase_c++

* fixed bugs in couchbase header files

* Added License and formatted to google c++ norms

* fixed bugs, added support for collections and added couchbase_client.md

* fixed license issue

* added custom logic for caching collectionIDs

* added caching of collection manifests

* Added example code for multithreaded demonstration

* updated CMake

* Abstracted CRUD operations

* Added pipeline/batching support

* commented unused variables

* Updated support for C++17

* fixed some issue.

* Using Mutex instead of shared lock to support c++11

* Formatted code to google c++ format

* Introduced local cache per-instance of CouchbaseOperations and added functionality to handle server side manifest updates.

* Delete MODULE.bazel.lock

Unnecessary file

* Fixed bugs in local collection cache and collection refresh logic

* remove recurring statements

* Fixed bugs/repetitive calls to refreshing manifest on server

* Formatted function/variable naming scheme and formatted code in c++ google format

* removed unnecessary code

* updated comments

* updated comments

* updated documentation

* updated documentation

* updated documentation

* updated documentation

* Updated documentation

* Updated documentation

* Update documentation

* Added features and fixed bugs in multithreaded environment

Using connection_groups to differentiate between connections across CouchbaseOperations instances to different buckets.

Renamed CollectionManifestTracker class to CollectionManifestManager and all the related functionality inside it as before refreshing method was outside this class

Added two different authenticate method authenticate(not secure) and authenticateSSL(secure)

* Updated multithreaded and single threaded code.

Added an example where a single instance is being shared across the threads when operating on single bucket.

* updated documentation

updated the documentation on thread safe operations and fixed small small discrepancies.

* removed commented code and updated readme to have links for cluster download certificate

* removed unused code.

* Added traditional bRPC coding approach

Traditional bRPC coding approach doesn't uses high level functions but provides more control to the user

fixed formatting issues.

fixed the bug in couchbase.cpp where logic to check the cache is empty was inverted

* updated couchbase_example.md

* added unit test cases

* removed using namespace std from couchbase.h

* restored original CMakeLists.txt
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 06aee44..9b5db48 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -603,4 +603,4 @@
 
 # Install pkgconfig
 configure_file(cmake/brpc.pc.in ${PROJECT_BINARY_DIR}/brpc.pc @ONLY)
-install(FILES ${PROJECT_BINARY_DIR}/brpc.pc DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig)
+install(FILES ${PROJECT_BINARY_DIR}/brpc.pc DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig)
\ No newline at end of file
diff --git a/docs/en/couchbase_example.md b/docs/en/couchbase_example.md
new file mode 100644
index 0000000..6748580
--- /dev/null
+++ b/docs/en/couchbase_example.md
@@ -0,0 +1,1209 @@
+## Couchbase bRPC Binary Protocol Integration
+
+This document explains the implementation of Couchbase Binary Protocol support added to bRPC, and the available high-level operations, collection support, SSL authentication, and how to run the provided example client against either a local Couchbase Server cluster or a Couchbase Capella (cloud) deployment. However, the couchbase binary protocol implementation in bRPC currently do not have fine-grained optimizations which has been already done in the couchbase-cxx-client SDK also having query support, better error handling and much more optimized/reliable operations. So, we also added the support of couchbase using couchbase-cxx-SDK in bRPC and is available at [Couchbaselabs-cb-brpc](https://github.com/couchbaselabs/cb_brpc/tree/couchbase_sdk_brpc).
+
+---
+### 1. Overview
+
+The integration provides high-level APIs for communicating with Couchbase Server using its Binary Protocol, using the high-level `CouchbaseOperations` class which provides a simplified interface.
+
+The core pieces are:
+* `src/brpc/policy/couchbase_protocol.[h|cpp]` – framing + parse loop for binary responses, and request serialization.
+* `src/brpc/couchbase.[h|cpp]` – high-level `CouchbaseOperations` class with request (`CouchbaseRequest`) and response (`CouchbaseResponse`) builders, parsers and error-handlers.
+* `example/couchbase_c++/couchbase_client.cpp` – an end‑to‑end example using the high-level API for authentication, bucket selection, CRUD operations, and collection‑scoped operations.
+* `example/couchbase_c++/multithreaded_couchbase_client.cpp` – a multithreaded example where an instance of `CouchbaseOperations` is shared across the threads operating on same bucket. An another block of code where multiple threads have their own `CouchbaseOperations` instance as the threads operate on different buckets.
+* `example/couchbase_c++/traditional_brpc_couchbase_client.cpp` – demonstrates the traditional bRPC approach with manual channel, controller, and request/response management for advanced users who need fine-grained control.
+
+Design goals:
+* **SSL Support**: Built-in SSL/TLS support for secure connections to Couchbase Capella.
+* **Per-instance Authentication**: Each `CouchbaseOperations` object maintains its own authenticated session if each instance connects to a different bucket, when multiple instances connect/operate on the same bucket then a single TCP socket is shared for these `CouchbaseOperations` instances because separate `connection_groups` are created on the basis of `server_name+bucket`.
+* **Collection Support**: Native support for collection-scoped operations.
+* Keep wire structs identical to the binary protocol (24‑byte header, network order numeric fields).
+* Future extensions for advanced features.
+
+---
+### 2. Features
+
+| Category | Supported Operations | Notes |
+|----------|----------------------|-------|
+| **High-Level API** | `CouchbaseOperations` class | **Recommended**: Simple methods returning `Result` struct |
+| **Traditional API** | Manual channel/controller management | **Advanced**: Direct bRPC access for custom configurations |
+| **SSL/TLS Support** | Built-in SSL encryption | **Required** for Couchbase Capella, optional for local clusters |
+| Authentication | SASL `PLAIN` with/without SSL | `authenticate()` for non-SSL, `authenticateSSL()` for SSL connections |
+| Bucket selection | Integrated with authentication | Bucket specified during authentication; `selectBucket()` also available separately |
+| Basic KV | `add()`, `upsert()`, `delete_()`, `get()` | Clean API with `Result` struct error handling; |
+| **Pipeline Operations** | `beginPipeline()`, `pipelineRequest()`, `executePipeline()` | **NEW**: Batch multiple operations in single network call for improved performance |
+| Collections | Collection-scoped CRUD operations | Pass collection name as optional parameter (defaults to "_default") |
+| Error Handling | `Result.success` + `Result.error_message` + `Result.status_code` | Human-readable error messages with Couchbase status codes |
+
+- **Simplified**: No need to manage channels, controllers, or response parsing
+- **Flexible Threading**: Share instances across threads for same bucket/server, or create separate instances for different buckets/servers
+- **Error Handling**: Simple boolean success with descriptive error messages and error codes
+- **SSL Built-in**: SSL handling for secure connections
+
+
+---
+### 3. Binary Protocol Mapping
+
+Couchbase binary protcol header, for original documentation [click here](https://github.com/couchbase/kv_engine/blob/master/docs/BinaryProtocol.md). The following header format has been used to connect with the couchbase servers. 
+```
+Byte/     0       |       1       |       2       |       3       |
+     /              |               |               |               |
+    |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+    +---------------+---------------+---------------+---------------+
+   0| Magic         | Opcode        | Key length                    |
+    +---------------+---------------+---------------+---------------+
+   4| Extras length | Data type     | vbucket id                    |
+    +---------------+---------------+---------------+---------------+
+   8| Total body length                                             |
+    +---------------+---------------+---------------+---------------+
+  12| Opaque                                                        |
+    +---------------+---------------+---------------+---------------+
+  16| CAS                                                           |
+    |                                                               |
+    +---------------+---------------+---------------+---------------+
+    Total 24 bytes
+```
+
+Overall packet structure:-
+```
+  Byte/     0       |       1       |       2       |       3       |
+     /              |               |               |               |
+    |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+    +---------------+---------------+---------------+---------------+
+   0| HEADER                                                        |
+    |                                                               |
+    |                                                               |
+    |                                                               |
+    +---------------+---------------+---------------+---------------+
+  24| COMMAND-SPECIFIC EXTRAS (as needed)                           |
+    |  (note length in the extras length header field)              |
+    +---------------+---------------+---------------+---------------+
+   m| Key (as needed)                                               |
+    |  (note length in key length header field)                     |
+    +---------------+---------------+---------------+---------------+
+   n| Value (as needed)                                             |
+    |  (note length is total body length header field, minus        |
+    |   sum of the extras and key length body fields)               |
+    +---------------+---------------+---------------+---------------+
+    Total 24 + x bytes (24 byte header, and x byte body)
+```
+
+---
+### 4. High-Level API (`CouchbaseOperations`)
+
+**Approach**: Use the `CouchbaseOperations` class for operations. Instances can be shared across threads when connecting to the same bucket, or you can create separate instances in multi-threading where each thread is connecting to a separate bucket.
+
+#### Basic Usage:
+```cpp
+#include <brpc/couchbase.h>
+
+brpc::CouchbaseOperations couchbase_ops;
+
+// 1. Authenticate with bucket selection (REQUIRED for each instance)
+brpc::CouchbaseOperations::Result auth_result = couchbase_ops.authenticate(
+    username, password, server_address, bucket_name);
+if (!auth_result.success) {
+    LOG(ERROR) << "Auth failed: " << auth_result.error_message;
+    return -1;
+}
+
+// 2. Perform operations (bucket is already selected during authentication)
+brpc::CouchbaseOperations::Result add_result = couchbase_ops.add("user::123", json_value);
+if (add_result.success) {
+    std::cout << "Document added successfully!" << std::endl;
+} else {
+    std::cout << "Add failed: " << add_result.error_message << std::endl;
+}
+
+// Optional: Switch to a different bucket (if needed)
+// brpc::CouchbaseOperations::Result bucket_result = couchbase_ops.selectBucket("another_bucket");
+```
+
+#### SSL Authentication (Essential for Couchbase Capella):
+To know how to download the security certificate [click here](https://docs.couchbase.com/cloud/security/security-certificates.html).
+```cpp
+// For Couchbase Capella (cloud) - SSL is REQUIRED
+brpc::CouchbaseOperations::Result auth_result = couchbase_ops.authenticateSSL(
+    username, 
+    password, 
+    "cluster.cloud.couchbase.com:11207",  // SSL port
+    bucket_name,                          // bucket name
+    "path/to/certificate.txt"             // certificate path(can be downloaded from capella UI)
+);
+```
+
+#### Collection Operations:
+```cpp
+// Default collection
+auto result = couchbase_ops.get("doc::1");
+
+// Specific collection
+auto result = couchbase_ops.get("doc::1", "my_collection");
+auto add_result = couchbase_ops.add("doc::2", value, "my_collection");
+```
+
+#### Pipeline Operations (Performance Optimization):
+The pipeline API allows batching multiple operations into a single network call, significantly improving performance for bulk operations:
+
+#### How Pipeline Operations Work
+
+1. **Begin Pipeline**: Start a new pipeline session
+2. **Add Operations**: Queue multiple operations without executing them
+3. **Execute Pipeline**: Send all operations in a single network call
+4. **Process Results**: Handle results in the same order as requests
+
+#### Pipeline API Methods
+
+| Method | Description | Usage |
+|--------|-------------|-------|
+| `beginPipeline()` | Start a new pipeline session | Must call before adding operations |
+| `pipelineRequest(op_type, key, value, collection)` | Add operation to pipeline | Supports all CRUD operations |
+| `executePipeline()` | Execute all queued operations | Returns `vector<Result>` in request order |
+| `clearPipeline()` | Clear pipeline without executing | Use for cleanup on errors |
+| `isPipelineActive()` | Check if pipeline is active | Returns `bool` |
+| `getPipelineSize()` | Get number of queued operations | Returns `size_t` |
+
+```cpp
+// Begin a new pipeline
+if (!couchbase_ops.beginPipeline()) {
+    LOG(ERROR) << "Failed to begin pipeline";
+    return -1;
+}
+
+// Add multiple operations to the pipeline (not executed yet)
+bool success = true;
+success &= couchbase_ops.pipelineRequest(brpc::CouchbaseOperations::ADD, "key1", "value1");
+success &= couchbase_ops.pipelineRequest(brpc::CouchbaseOperations::UPSERT, "key2", "value2");
+success &= couchbase_ops.pipelineRequest(brpc::CouchbaseOperations::GET, "key1");
+success &= couchbase_ops.pipelineRequest(brpc::CouchbaseOperations::DELETE, "key3");
+
+if (!success) {
+    couchbase_ops.clearPipeline();  // Clean up on error
+    return -1;
+}
+
+// Execute all operations in a single network call
+std::vector<brpc::CouchbaseOperations::Result> results = couchbase_ops.executePipeline();
+
+// Process results in the same order as requests
+for (size_t i = 0; i < results.size(); ++i) {
+    if (results[i].success) {
+        std::cout << "Operation " << i << " succeeded" << std::endl;
+        if (!results[i].value.empty()) {
+            std::cout << "Value: " << results[i].value << std::endl;
+        }
+    } else {
+        std::cout << "Operation " << i << " failed: " << results[i].error_message << std::endl;
+    }
+}
+```
+
+**Pipeline with Collections**:
+```cpp
+// Pipeline operations can also use collections
+couchbase_ops.beginPipeline();
+couchbase_ops.pipelineRequest(brpc::CouchbaseOperations::ADD, "doc1", "value1", "my_collection");
+couchbase_ops.pipelineRequest(brpc::CouchbaseOperations::GET, "doc1", "", "my_collection");
+auto results = couchbase_ops.executePipeline();
+```
+
+#### Error Handling Pattern:
+```cpp
+brpc::CouchbaseOperations::Result result = couchbase_ops.someOperation(...);
+if (!result.success) {
+    // Handle error
+    LOG(ERROR) << "Operation failed: " << result.error_message;
+    LOG(ERROR) << "Error Code: " << result.status_code;  // Couchbase status code
+} else {
+    // Use result.value if applicable (for Get operations)
+    std::cout << "Retrieved value: " << result.value << std::endl;
+}
+```
+
+---
+### 5. Traditional bRPC Couchbase Client (`traditional_brpc_couchbase_client.cpp`)
+
+For developers who need fine-grained control over the bRPC framework or want to understand the low-level implementation, we provide a traditional bRPC client example. This approach requires manual management of channels, controllers, and response parsing.
+
+**When to use Traditional API:**
+- Advanced bRPC users who need custom channel configurations
+- Fine-grained control over connection pooling and retry logic
+- Direct access to underlying bRPC controller for debugging
+- Learning the internal workings of the high-level API
+
+**When to use High-Level API (Recommended):**
+- Standard CRUD operations and authentication
+- Simpler error handling and cleaner code
+- Collection based operations with minimal boilerplate
+- Pipeline operations for batch processing while also available in traditional approach it is easier to do using High-Level API.
+
+#### Traditional Client Example Walkthrough
+
+The traditional client (`example/couchbase_c++/traditional_brpc_couchbase_client.cpp`) demonstrates the low-level bRPC approach:
+
+**1. Channel Setup and Configuration**
+```cpp
+brpc::Channel channel;
+brpc::ChannelOptions options;
+options.protocol = brpc::PROTOCOL_COUCHBASE;    // Set Couchbase protocol
+options.connection_type = "single";              // Single persistent connection
+options.timeout_ms = 1000;                       // 1 second timeout
+options.max_retry = 3;                          // Retry up to 3 times
+
+if (channel.Init("localhost:11210", &options) != 0) {
+    LOG(ERROR) << "Failed to initialize channel";
+    return -1;
+}
+```
+
+**2. Authentication with Manual Request/Response Handling**
+```cpp
+brpc::Controller cntl;
+brpc::CouchbaseOperations::CouchbaseRequest req;
+brpc::CouchbaseOperations::CouchbaseResponse res;
+uint64_t cas;
+
+// Build authentication request
+req.authenticateRequest("Administrator", "password");
+
+// Execute the request
+channel.CallMethod(NULL, &cntl, &req, &res, NULL);
+
+// Check controller status
+if (cntl.Failed()) {
+    LOG(ERROR) << "Unable to authenticate: " << cntl.ErrorText();
+    return -1;
+}
+
+// Parse response - must call popHello() and popAuthenticate() in order
+if (res.popHello(&cas) && res.popAuthenticate(&cas)) {
+    std::cout << "Authentication Successful" << std::endl;
+} else {
+    std::cout << "Authentication Failed with status code: " 
+              << std::hex << res._status_code << std::endl;
+    return -1;
+}
+```
+
+**3. Bucket Selection**
+```cpp
+// IMPORTANT: Reset controller and clear request/response before each operation
+cntl.Reset();
+req.Clear();
+res.Clear();
+
+// Build bucket selection request
+req.selectBucketRequest("testing");
+
+// Execute the request
+channel.CallMethod(NULL, &cntl, &req, &res, NULL);
+
+if (cntl.Failed()) {
+    LOG(ERROR) << "Unable to select bucket: " << cntl.ErrorText();
+    return -1;
+}
+
+// Parse response - status_code only updated AFTER calling pop function
+if (res.popSelectBucket(&cas)) {
+    std::cout << "Bucket Selection Successful" << std::endl;
+} else {
+    std::cout << "Bucket Selection Failed with status code: " 
+              << std::hex << res._status_code << std::endl;
+    std::cout << "Error Message: " << res.lastError() << std::endl;
+    return -1;
+}
+```
+
+**4. ADD Operation (Create Document)**
+```cpp
+// Reset for new operation
+cntl.Reset();
+req.Clear();
+res.Clear();
+
+// Build ADD request
+req.addRequest(
+    "sample_key",                                           // key
+    R"({"name": "John Doe", "age": 30, "email": "john@example.com"})",  // value
+    0,      // flags
+    0,      // exptime (0 = no expiration)
+    0       // cas (0 for new document)
+);
+
+// Execute the request
+channel.CallMethod(NULL, &cntl, &req, &res, NULL);
+
+if (cntl.Failed()) {
+    LOG(ERROR) << "Unable to add key-value: " << cntl.ErrorText();
+    return -1;
+}
+
+// Parse response
+if (res.popAdd(&cas)) {
+    std::cout << "Key-Value Addition Successful" << std::endl;
+} else {
+    std::cout << "Key-Value Addition Failed with status code: " 
+              << std::hex << res._status_code << std::endl;
+    std::cout << "Error Message: " << res.lastError() << std::endl;
+    return -1;
+}
+```
+
+**5. GET Operation (Retrieve Document)**
+```cpp
+// Reset for new operation
+cntl.Reset();
+req.Clear();
+res.Clear();
+
+// Build GET request
+req.getRequest("sample_key");
+
+// Execute the request
+channel.CallMethod(NULL, &cntl, &req, &res, NULL);
+
+if (cntl.Failed()) {
+    LOG(ERROR) << "Unable to get value for key: " << cntl.ErrorText();
+    return -1;
+}
+
+// Parse response - GET returns value and flags
+std::string value;
+uint32_t flags;
+if (res.popGet(&value, &flags, &cas)) {
+    std::cout << "Key-Value Retrieval Successful" << std::endl;
+    std::cout << "Retrieved Value: " << value << std::endl;
+} else {
+    std::cout << "Key-Value Retrieval Failed with status code: " 
+              << std::hex << res._status_code << std::endl;
+    std::cout << "Error Message: " << res.lastError() << std::endl;
+    return -1;
+}
+```
+
+**6. DELETE Operation (Remove Document)**
+```cpp
+// Reset for new operation
+cntl.Reset();
+req.Clear();
+res.Clear();
+
+// Build DELETE request
+req.deleteRequest("sample_key");
+
+// Execute the request
+channel.CallMethod(NULL, &cntl, &req, &res, NULL);
+
+if (cntl.Failed()) {
+    LOG(ERROR) << "Unable to delete key-value: " << cntl.ErrorText();
+    return -1;
+}
+
+// Parse response
+if (res.popDelete()) {
+    std::cout << "Key-Value Deletion Successful" << std::endl;
+} else {
+    std::cout << "Key-Value Deletion Failed with status code: " 
+              << std::hex << res._status_code << std::endl;
+    std::cout << "Error Message: " << res.lastError() << std::endl;
+    return -1;
+}
+```
+
+#### Key Differences: Traditional vs High-Level API
+
+| Aspect | Traditional API | High-Level API |
+|--------|----------------|----------------|
+| **Setup** | Manual channel, controller, request/response management | Single `CouchbaseOperations` instance |
+| **Error Handling** | Check both `cntl.Failed()` and response status | Simple `Result.success` boolean |
+| **Resource Management** | Must call `cntl.Reset()`, `req.Clear()`, `res.Clear()` | Automatic |
+| **Response Parsing** | Manual `pop*()` calls with CAS handling | Transparent |
+| **Code Verbosity** | ~15-20 lines per operation | ~2-3 lines per operation |
+| **Collections** | Manual collection ID retrieval and management | Automatic with collection name parameter |
+| **Pipeline Operations** | Complex manual request building | Simple `beginPipeline()`, `pipelineRequest()`, `executePipeline()` |
+| **SSL Support** | Manual SSL configuration in channel options | Built-in `authenticateSSL()` method |
+| **Threading** | Manual connection pooling management | Automatic connection group management |
+
+---
+### 6. Request/Response Classes (`CouchbaseRequest`/`CouchbaseResponse`)
+
+These classes are public in `CouchbaseOperations` and can be used for advanced bRPC programs. The high-level API uses these classes internally, and the traditional client example demonstrates their direct usage. They are responsible for building the request that needs to be sent and received over the channel.
+
+
+#### Response Parsing:
+Each `pop*` method consumes the front of the internal response buffer, validating:
+1. Header present.
+2. Opcode matches expected operation.
+3. Status == success (otherwise `_err` filled with formatted message).
+4. Body length sufficient.
+
+---
+### 7. Example Client Walkthrough
+
+#### Single-Threaded Example (`couchbase_client.cpp`)
+Uses the **high-level `CouchbaseOperations` API**:
+
+1. **Create `CouchbaseOperations` instance** - can create more than one per thread.
+```cpp
+brpc::CouchbaseOperations couchbase_ops;
+```
+
+2. **Prompt for credentials** - username/password for authentication.
+```cpp
+std::string username = "Administrator";
+std::string password = "password";
+while (username.empty() || password.empty()) {
+    std::cout << "Enter Couchbase username: ";
+    std::cin >> username;
+    std::cout << "Enter Couchbase password: ";
+    std::cin >> password;
+}
+```
+
+3. **Authentication with bucket selection** - `authenticate()` for local, `authenticateSSL()` for Capella.
+
+**Function Signatures:**
+```cpp
+// Non-SSL authentication
+Result authenticate(const string& username,     // Couchbase username 
+                   const string& password,     // Couchbase password
+                   const string& server_address, // Server host:port (e.g., "localhost:11210")
+                   const string& bucket_name);   // Target bucket name
+
+// SSL authentication  
+Result authenticateSSL(const string& username,     // Couchbase username
+                      const string& password,     // Couchbase password  
+                      const string& server_address, // Server host:port (e.g., "cluster.cloud.couchbase.com:11207")
+                      const string& bucket_name,   // Target bucket name
+                      string path_to_cert);        // Path to SSL certificate file
+```
+
+**Usage Examples:**
+```cpp
+// For local Couchbase (non-SSL)
+brpc::CouchbaseOperations::Result auth_result = 
+    couchbase_ops.authenticate(username, password, FLAGS_server, "testing");
+
+// For Couchbase Capella (SSL)
+// brpc::CouchbaseOperations::Result auth_result = 
+//     couchbase_ops.authenticateSSL(username, password, "cluster.cloud.couchbase.com:11207", 
+//                                   "bucket_name", "path/to/cert.txt");
+
+if (!auth_result.success) {
+    LOG(ERROR) << "Authentication failed: " << auth_result.error_message;
+    return -1;
+}
+```
+
+4. **Basic CRUD operations**:
+   - Add document (should succeed)
+   - Try adding same key again (should fail with "key exists")
+   - Get document (retrieve the added document)
+
+**Function Signatures:**
+```cpp
+// ADD operation - creates new document, fails if key exists
+Result add(const string& key,                    // Document key/ID
+          const string& value,                  // Document value (JSON string)
+          string collection_name = "_default"); // Collection name (optional, defaults to "_default")
+
+// GET operation - retrieves document by key
+Result get(const string& key,                    // Document key/ID to retrieve
+          string collection_name = "_default"); // Collection name (optional, defaults to "_default")
+```
+
+**Usage Examples:**
+```cpp
+std::string add_key = "user::test_brpc_binprot";
+std::string add_value = R"({"name": "John Doe", "age": 30, "email": "john@example.com"})";
+
+// First ADD operation (should succeed)
+brpc::CouchbaseOperations::Result add_result = couchbase_ops.add(add_key, add_value);
+if (add_result.success) {
+    std::cout << "ADD operation successful" << std::endl;
+} else {
+    std::cout << "ADD operation failed: " << add_result.error_message << std::endl;
+}
+
+// Second ADD operation (should fail - key exists)
+brpc::CouchbaseOperations::Result add_result2 = couchbase_ops.add(add_key, add_value);
+if (!add_result2.success) {
+    std::cout << "Second ADD failed as expected: " << add_result2.error_message << std::endl;
+}
+
+// GET operation
+brpc::CouchbaseOperations::Result get_result = couchbase_ops.get(add_key);
+if (get_result.success) {
+    std::cout << "GET operation successful" << std::endl;
+    std::cout << "GET value: " << get_result.value << std::endl;
+}
+```
+
+5. **Multiple document operations** - Add several documents with different keys.
+```cpp
+std::string item1_key = "binprot_item1";
+std::string item2_key = "binprot_item2";
+std::string item3_key = "binprot_item3";
+
+couchbase_ops.add(item1_key, add_value);
+couchbase_ops.add(item2_key, add_value);
+couchbase_ops.add(item3_key, add_value);
+```
+
+6. **Upsert operations**:
+   - Upsert existing document (should update)
+   - Upsert new document (should create)
+   - Verify with Get operations
+
+**Function Signature:**
+```cpp
+// UPSERT operation - creates new document or updates existing one
+Result upsert(const string& key,                    // Document key/ID
+             const string& value,                  // Document value (JSON string)
+             string collection_name = "_default"); // Collection name (optional, defaults to "_default")
+```
+
+**Usage Examples:**
+```cpp
+std::string upsert_key = "upsert_test";
+std::string upsert_value = R"({"operation": "upsert", "version": 1})";
+
+// Upsert new document (will create)
+brpc::CouchbaseOperations::Result upsert_result = couchbase_ops.upsert(upsert_key, upsert_value);
+
+// Upsert existing document (will update)
+std::string updated_value = R"({"operation": "upsert", "version": 2})";
+brpc::CouchbaseOperations::Result update_result = couchbase_ops.upsert(upsert_key, updated_value);
+
+// Verify with GET
+brpc::CouchbaseOperations::Result check_result = couchbase_ops.get(upsert_key);
+```
+
+7. **Delete operations**:
+   - Delete non-existent key (should fail gracefully)
+   - Delete existing key (should succeed)
+
+**Function Signature:**
+```cpp
+// DELETE operation - removes document by key
+Result delete_(const string& key,                   // Document key/ID to delete
+              string collection_name = "_default"); // Collection name (optional, defaults to "_default")
+```
+
+**Usage Examples:**
+```cpp
+// Delete non-existent key
+std::string delete_key = "non_existent_key";
+brpc::CouchbaseOperations::Result delete_result = couchbase_ops.delete_(delete_key);
+if (!delete_result.success) {
+    std::cout << "Delete failed as expected: " << delete_result.error_message << std::endl;
+}
+
+// Delete existing key
+std::string delete_existing_key = "binprot_item1";
+brpc::CouchbaseOperations::Result delete_existing_result = couchbase_ops.delete_(delete_existing_key);
+if (delete_existing_result.success) {
+    std::cout << "Delete existing key successful" << std::endl;
+}
+```
+
+8. **Collection-scoped operations** - Add/Get/Upsert/Delete in specific collections.
+
+**Note:** All CRUD operations support an optional collection parameter. When not specified, operations default to the "_default" collection.
+
+**Usage Examples:**
+```cpp
+std::string collection_name = "testing_collection";  // Target collection name
+std::string coll_key = "collection::doc1";           // Document key
+std::string coll_value = R"({"collection_operation": "add", "scope": "custom"})";  // Document value
+
+// Collection-scoped ADD (key, value, collection_name)
+brpc::CouchbaseOperations::Result coll_add_result = 
+    couchbase_ops.add(coll_key, coll_value, collection_name);
+
+// Collection-scoped GET (key, collection_name)
+brpc::CouchbaseOperations::Result coll_get_result = 
+    couchbase_ops.get(coll_key, collection_name);
+
+// Collection-scoped UPSERT (key, value, collection_name)
+brpc::CouchbaseOperations::Result coll_upsert_result = 
+    couchbase_ops.upsert(coll_key, coll_value, collection_name);
+
+// Collection-scoped DELETE (key, collection_name)
+brpc::CouchbaseOperations::Result coll_delete_result = 
+    couchbase_ops.delete_(coll_key, collection_name);
+```
+
+9. **Pipeline operations demo**:
+   - Begin pipeline and add multiple operations
+   - Execute batch operations in single network call
+   - Process results in order
+   - Collection-scoped pipeline operations
+   - Error handling and cleanup
+
+**Function Signatures:**
+```cpp
+// Pipeline management functions
+bool beginPipeline();                               // Start a new pipeline session
+
+bool pipelineRequest(operation_type op_type,        // Operation type (ADD, UPSERT, GET, DELETE, etc.)
+                    const string& key,             // Document key/ID
+                    const string& value = "",       // Document value (empty for GET/DELETE operations)
+                    string collection_name = "_default"); // Collection name (optional)
+
+vector<Result> executePipeline();                   // Execute all queued operations and return results
+
+bool clearPipeline();                              // Clear pipeline without executing (cleanup)
+
+// Pipeline status functions
+bool isPipelineActive() const;                      // Check if pipeline is active
+size_t getPipelineSize() const;                    // Get number of queued operations
+```
+
+**Usage Examples:**
+```cpp
+// Begin pipeline
+if (!couchbase_ops.beginPipeline()) {
+    std::cout << "Failed to begin pipeline" << std::endl;
+    return -1;
+}
+
+// Add multiple operations to pipeline
+std::string pipeline_key1 = "pipeline::doc1";
+std::string pipeline_key2 = "pipeline::doc2";
+std::string pipeline_value1 = R"({"operation": "pipeline_add", "id": 1})";
+std::string pipeline_value2 = R"({"operation": "pipeline_upsert", "id": 2})";
+
+bool pipeline_success = true;
+// pipelineRequest(operation_type, key, value, collection_name)
+pipeline_success &= couchbase_ops.pipelineRequest(brpc::CouchbaseOperations::ADD, pipeline_key1, pipeline_value1);
+pipeline_success &= couchbase_ops.pipelineRequest(brpc::CouchbaseOperations::UPSERT, pipeline_key2, pipeline_value2);
+pipeline_success &= couchbase_ops.pipelineRequest(brpc::CouchbaseOperations::GET, pipeline_key1);  // Empty value for GET
+
+if (!pipeline_success) {
+    couchbase_ops.clearPipeline();  // Clean up on error
+    return -1;
+}
+
+// Execute pipeline - returns results in same order as requests
+std::vector<brpc::CouchbaseOperations::Result> pipeline_results = couchbase_ops.executePipeline();
+
+// Process results
+for (size_t i = 0; i < pipeline_results.size(); ++i) {
+    if (pipeline_results[i].success) {
+        std::cout << "Operation " << (i + 1) << " SUCCESS";
+        if (!pipeline_results[i].value.empty()) {
+            std::cout << " - Value: " << pipeline_results[i].value;
+        }
+        std::cout << std::endl;
+    } else {
+        std::cout << "Operation " << (i + 1) << " FAILED: " 
+                  << pipeline_results[i].error_message << std::endl;
+    }
+}
+
+// Collection-scoped pipeline operations
+if (couchbase_ops.beginPipeline()) {
+    // pipelineRequest(operation_type, key, value, collection_name)
+    couchbase_ops.pipelineRequest(brpc::CouchbaseOperations::ADD, "coll_pipeline::doc1", 
+                                  R"({"collection_operation": "pipeline_add", "id": 1})", collection_name);
+    couchbase_ops.pipelineRequest(brpc::CouchbaseOperations::GET, "coll_pipeline::doc1", "", collection_name);
+    auto coll_results = couchbase_ops.executePipeline();
+}
+```
+
+10. **Bucket switching** - Demonstrate changing bucket selection.
+
+**Function Signature:**
+```cpp
+// SELECTBUCKET operation - switch to a different bucket on the same server
+Result selectBucket(const string& bucket_name);    // Target bucket name to switch to
+```
+
+**Usage Example:**
+```cpp
+std::string bucket_name = "testing";
+std::cout << "Enter Couchbase bucket name: ";
+std::cin >> bucket_name;
+
+// selectBucket(bucket_name) - switches to the specified bucket
+brpc::CouchbaseOperations::Result bucket_result = couchbase_ops.selectBucket(bucket_name);
+if (!bucket_result.success) {
+    LOG(ERROR) << "Bucket selection failed: " << bucket_result.error_message;
+    return -1;
+} else {
+    std::cout << "Bucket Selection Successful" << std::endl;
+}
+
+// Perform operations on new bucket
+performOperations(couchbase_ops);
+```
+
+#### Multithreaded Example (`multithreaded_couchbase_client.cpp`)
+Demonstrates:
+- **20 bthreads** (5 threads per bucket across 4 buckets)
+- **Multiple threading patterns**: Each thread can create its own instance or share instances
+- **Concurrent operations** across multiple buckets and collections
+- **Thread-safe statistics tracking** for operations
+- **Collection-scoped operations** across threads
+
+**Global Configuration**:
+```cpp
+const int NUM_THREADS = 20;
+const int THREADS_PER_BUCKET = 5;
+
+// Global config structure
+struct {
+    std::string username = "Administrator";
+    std::string password = "password";
+    std::vector<std::string> bucket_names = {"t0", "t1", "t2", "t3"};
+} g_config;
+
+// Thread statistics tracking
+struct ThreadStats {
+    std::atomic<int> operations_attempted{0};
+    std::atomic<int> operations_successful{0};
+    std::atomic<int> operations_failed{0};
+};
+
+struct GlobalStats {
+    ThreadStats total;
+    std::vector<ThreadStats> per_thread_stats;
+    GlobalStats() : per_thread_stats(NUM_THREADS) {}
+} g_stats;
+```
+
+**Thread Worker Function**:
+```cpp
+struct ThreadArgs {
+    int thread_id;
+    int bucket_id;
+    std::string bucket_name;
+    ThreadStats* stats;
+};
+
+void* thread_worker(void* arg) {
+    ThreadArgs* args = static_cast<ThreadArgs*>(arg);
+    
+    // Create CouchbaseOperations instance for this thread
+    brpc::CouchbaseOperations couchbase_ops;
+    
+    // Authentication with assigned bucket
+    brpc::CouchbaseOperations::Result auth_result = couchbase_ops.authenticate(
+        g_config.username, g_config.password, "127.0.0.1:11210", args->bucket_name);
+    
+    // For SSL authentication:
+    // brpc::CouchbaseOperations::Result auth_result = couchbase_ops.authenticateSSL(
+    //     g_config.username, g_config.password, "127.0.0.1:11207", args->bucket_name, "/path/to/cert.txt");
+    
+    if (!auth_result.success) {
+        std::cout << "Thread " << args->thread_id << ": Auth failed - " 
+                  << auth_result.error_message << std::endl;
+        return NULL;
+    }
+    
+    // Perform CRUD operations on default collection
+    std::string base_key = "thread_" + std::to_string(args->thread_id);
+    perform_crud_operations_default(couchbase_ops, base_key, args->stats);
+    
+    // Perform collection-scoped operations
+    perform_crud_operations_collection(couchbase_ops, base_key, "my_collection", args->stats);
+    
+    return NULL;
+}
+```
+
+**CRUD Operations Functions**:
+```cpp
+void perform_crud_operations_default(brpc::CouchbaseOperations& couchbase_ops,
+                                   const std::string& base_key, ThreadStats* stats) {
+    std::string key = base_key + "_default";
+    std::string value = R"({"thread_id": %d, "collection": "default"})";
+    
+    stats->operations_attempted++;
+    
+    // UPSERT operation
+    brpc::CouchbaseOperations::Result result = couchbase_ops.upsert(key, value);
+    if (result.success) {
+        stats->operations_successful++;
+    } else {
+        stats->operations_failed++;
+    }
+    
+    // GET operation
+    stats->operations_attempted++;
+    result = couchbase_ops.get(key);
+    if (result.success) {
+        stats->operations_successful++;
+    } else {
+        stats->operations_failed++;
+    }
+    
+    // DELETE operation
+    stats->operations_attempted++;
+    result = couchbase_ops.delete_(key);
+    if (result.success) {
+        stats->operations_successful++;
+    } else {
+        stats->operations_failed++;
+    }
+}
+
+void perform_crud_operations_collection(brpc::CouchbaseOperations& couchbase_ops,
+                                      const std::string& base_key,
+                                      const std::string& collection_name,
+                                      ThreadStats* stats) {
+    std::string key = base_key + "_collection";
+    std::string value = R"({"thread_id": %d, "collection": ")" + collection_name + R"("})";
+    
+    // Collection-scoped operations
+    stats->operations_attempted++;
+    brpc::CouchbaseOperations::Result result = couchbase_ops.upsert(key, value, collection_name);
+    if (result.success) {
+        stats->operations_successful++;
+    } else {
+        stats->operations_failed++;
+    }
+    
+    stats->operations_attempted++;
+    result = couchbase_ops.get(key, collection_name);
+    if (result.success) {
+        stats->operations_successful++;
+    } else {
+        stats->operations_failed++;
+    }
+}
+```
+
+**Main Function - Thread Management**:
+```cpp
+int main(int argc, char* argv[]) {
+    std::vector<bthread_t> threads(NUM_THREADS);
+    std::vector<ThreadArgs> thread_args(NUM_THREADS);
+    
+    // Create threads - 5 threads per bucket across 4 buckets
+    for (int i = 0; i < NUM_THREADS; ++i) {
+        thread_args[i].thread_id = i;
+        thread_args[i].bucket_id = i / THREADS_PER_BUCKET;
+        thread_args[i].bucket_name = g_config.bucket_names[thread_args[i].bucket_id];
+        thread_args[i].stats = &g_stats.per_thread_stats[i];
+        
+        if (bthread_start_background(&threads[i], NULL, thread_worker, &thread_args[i]) != 0) {
+            LOG(ERROR) << "Failed to create thread " << i;
+            return -1;
+        }
+    }
+    
+    // Wait for all threads to complete
+    for (int i = 0; i < NUM_THREADS; ++i) {
+        bthread_join(threads[i], NULL);
+    }
+    
+    // Aggregate and display statistics
+    g_stats.aggregate_stats();
+    std::cout << "Total operations attempted: " << g_stats.total.operations_attempted.load() << std::endl;
+    std::cout << "Total operations successful: " << g_stats.total.operations_successful.load() << std::endl;
+    std::cout << "Total operations failed: " << g_stats.total.operations_failed.load() << std::endl;
+    
+    return 0;
+}
+```
+
+**Alternative Pattern - Shared Instance Demo**:
+```cpp
+// Shared instance worker function
+void* shared_object_thread_worker(void *arg) {
+    ThreadArgs* shared_args = static_cast<ThreadArgs*>(arg);
+    brpc::CouchbaseOperations* shared_couchbase_ops = shared_args->couchbase_ops;
+    
+    // Perform operations - 10 times on default collection, 10 times on col1 collection
+    for (int i = 0; i < 10; ++i) {
+        std::string base_key = butil::string_printf("shared_thread_op_%d_thread_id_%d", 
+                                                   i, shared_args->thread_id);
+        
+        // CRUD operations on default collection using shared instance
+        perform_crud_operations_default(*shared_couchbase_ops, base_key, shared_args->stats);
+        
+        // CRUD operations on col1 collection using shared instance
+        perform_crud_operations_col1(*shared_couchbase_ops, base_key, shared_args->stats);
+        
+        // Small delay between operations
+        bthread_usleep(10000);  // 10ms
+    }
+    return NULL;
+}
+
+// Main function demonstrates shared instance pattern
+int main_shared_demo() {
+    // Create a shared CouchbaseOperations instance
+    brpc::CouchbaseOperations shared_couchbase_ops;
+    brpc::CouchbaseOperations::Result result;
+    
+    // Authenticate shared instance
+    result = shared_couchbase_ops.authenticate(
+        g_config.username, g_config.password, "127.0.0.1:11210", "t0");
+    
+    if (result.success) {
+        std::cout << GREEN << "Shared CouchbaseOperations instance authenticated successfully!" 
+                  << RESET << std::endl;
+    } else {
+        std::cout << RED << "Shared CouchbaseOperations instance authentication failed: " 
+                  << result.error_message << RESET << std::endl;
+        return -1;
+    }
+    
+    // Configure all threads to use the shared instance
+    std::vector<bthread_t> threads(NUM_THREADS);
+    std::vector<ThreadArgs> args(NUM_THREADS);
+    
+    for (int i = 0; i < NUM_THREADS; ++i) {
+        args[i].thread_id = i;
+        args[i].couchbase_ops = &shared_couchbase_ops;  // Point to shared instance
+        args[i].bucket_id = 0;
+        args[i].bucket_name = "t0";  // All threads use same bucket via shared instance
+        args[i].stats = &g_stats.per_thread_stats[i];
+    }
+    
+    // Start all threads using shared instance
+    for (int i = 0; i < NUM_THREADS; ++i) {
+        if (bthread_start_background(&threads[i], NULL, shared_object_thread_worker, &args[i]) != 0) {
+            std::cout << RED << "Failed to create shared object thread " << i << RESET << std::endl;
+            return -1;
+        }
+    }
+    
+    // Wait for all threads to complete
+    for (int i = 0; i < NUM_THREADS; ++i) {
+        bthread_join(threads[i], NULL);
+    }
+    
+    std::cout << GREEN << "All shared object threads completed!" << RESET << std::endl;
+    return 0;
+}
+```
+
+Key features:
+- Demonstrates different connection patterns for multithreaded scenarios
+- Shows concurrent access to different buckets and collections
+- Proper resource management in multithreaded environments
+- Statistics tracking across all threads
+- Both separate instance and shared instance patterns
+
+---
+### 8. Building and Running the Examples
+
+#### Build both examples:
+```bash
+cd example/couchbase_c++/
+make
+```
+
+#### Run Single-Threaded Example (High-Level API):
+```bash
+./couchbase_client
+```
+
+#### Run Multithreaded Example (High-Level API):
+```bash
+./multithreaded_couchbase_client
+```
+
+#### Run Traditional bRPC Client (Low-Level API):
+```bash
+./traditional_brpc_couchbase_client
+```
+
+---
+### 9. Setting Up Couchbase
+
+#### A. Local Install (Non‑Docker)
+Download from: https://www.couchbase.com/downloads/ (Community or Enterprise) and Install.
+
+Setup steps:
+- Open http://localhost:8091 in a browser and follow setup wizard
+- Set admin credentials (Administrator / password)
+- Accept terms, choose services (Data, Query, Index at minimum)
+- Initialize cluster
+- Create a bucket (e.g. travel-sample or custom)
+
+Create collections (7.0+):
+- Navigate: Buckets → Your Bucket → Scopes & Collections
+- Add a Scope (optional) or use `_default`
+- Add a Collection (e.g. `testing_collection`)
+
+**SSL Configuration (Optional for Local)**:
+```cpp
+// Local without SSL - authenticate with bucket selection
+auto result = couchbase_ops.authenticate(username, password, "localhost:11210", bucket_name);
+```
+
+#### B. Couchbase Capella (Cloud) - **SSL Required**
+1. Sign up / log in: https://cloud.couchbase.com/
+2. Create a Free Trial or Hosted Cluster
+3. Create a bucket (or load sample dataset)
+4. **Create database access credentials** with appropriate RBAC roles:
+   - Data Reader/Writer (minimum)
+   - Bucket Admin (for bucket operations)
+5. **Download SSL Certificate**:
+   - Go to Cluster → Connect → Download Certificate
+   - Save as `couchbase-cloud-cert.pem` in your project directory
+6. **Get connection endpoint**:
+   - Use the **KV endpoint** (port 11207 for SSL)
+   - Format: `your-cluster-id.cloud.couchbase.com:11207`
+
+**Capella SSL Authentication Example**:
+```cpp
+// Couchbase Capella - SSL is MANDATORY
+auto result = couchbase_ops.authenticateSSL(
+    "your_username", 
+    "your_password", 
+    "your-cluster.cloud.couchbase.com:11207",    // SSL port
+    "your_bucket_name",                          // bucket name
+    "couchbase-cloud-cert.pem"                   // certificate file
+);
+```
+
+**Important Notes for Capella**:
+- **SSL is mandatory** - connections without SSL will fail
+- Use port **11207** (SSL) instead of 11210 (non-SSL)
+- Certificate verification is required for security
+- Ensure firewall allows outbound connections on port 11207
+
+---
+
+### 10. Error Handling Patterns
+
+#### High-Level API (Recommended)
+The `CouchbaseOperations` class uses a simple `Result` struct:
+
+```cpp
+struct Result {
+    bool success;           // true if operation succeeded
+    string error_message;   // human-readable error description
+    string value;          // returned value (for Get operations)
+    uint16_t status_code;   // Couchbase status code (0x00 if success)
+};
+```
+
+**Recommended Pattern**:
+```cpp
+auto result = couchbase_ops.add("key", "value");
+if (!result.success) {
+    LOG(ERROR) << "Add failed: " << result.error_message;
+    LOG(ERROR) << "Status code: " << result.status_code;
+    // Handle error appropriately
+} else {
+    std::cout << "Add succeeded!" << std::endl;
+}
+
+// For Get operations, check both success and value
+auto get_result = couchbase_ops.get("key");
+if (get_result.success) {
+    std::cout << "Retrieved: " << get_result.value << std::endl;
+} else {
+    LOG(ERROR) << "Get failed: " << get_result.error_message;
+    LOG(ERROR) << "Status code: " << get_result.status_code;
+}
+```
+
+---
+### 11. Best Practices
+
+#### Threading Patterns
+> **💡 FLEXIBLE THREADING OPTIONS**
+> - **Same bucket/server**: Share a single `CouchbaseOperations` instance across threads
+> - **Different buckets**: Create separate instances for each bucket within the same server
+> - **Different servers**: Create separate instances for each server connection
+> - **Connection isolation**: Each instance uses unique connection groups based on server+bucket combination
+
+#### SSL Security
+- **Always use SSL for Couchbase Capella** (cloud deployments)
+- **Verify certificates** - don't disable certificate validation in production
+- **Use port 11207** for SSL connections
+- **Store certificates securely** and update them when they expire
+
+#### Performance
+- **Reuse `CouchbaseOperations` instances** - they maintain persistent connections
+- **Use pipeline operations for bulk operations** 
+- **Pipeline operations preserve order** - results correspond to request order
+
+#### Threading Examples
+```cpp
+// Option 1: Shared instance for same bucket
+brpc::CouchbaseOperations shared_ops;
+shared_ops.authenticate(username, password, server_address, bucket_name);
+
+void worker_thread_1() {
+    shared_ops.add("key1", "value1");  // Safe to share
+}
+void worker_thread_2() {
+    shared_ops.get("key2");  // Safe to share
+}
+
+// Option 2: Separate instances for different buckets
+brpc::CouchbaseOperations ops_bucket1;
+brpc::CouchbaseOperations ops_bucket2;
+ops_bucket1.authenticate(username, password, server_address, "bucket1");
+ops_bucket2.authenticate(username, password, server_address, "bucket2");
+
+// Option 3: Separate instances for different servers
+brpc::CouchbaseOperations ops_server1;
+brpc::CouchbaseOperations ops_server2;
+ops_server1.authenticate(username, password, "server1:11210", bucket_name);
+ops_server2.authenticate(username, password, "server2:11210", bucket_name);
+```
+
+---
+### 12. Summary and References
+This implementation provides high-level APIs for Couchbase KV operations. Couchbase (the company) contributed to this implementation, but it is not officially supported; it is "[Community Supported](https://docs.couchbase.com/server/current/third-party/integrations.html#support-model)".
+
+---
+
+## 💡 **THREADING USAGE PATTERNS** 💡
+> 
+> **✅ PATTERN 1: Shared instance when multiple threads operating on the same bucket**
+> ```cpp
+> brpc::CouchbaseOperations shared_ops;
+> shared_ops.authenticate(username, password, "server:11210", "my_bucket");
+> 
+> void worker_thread_1() {
+>     shared_ops.add("key1", "value1");  // ✅ Safe to share
+> }
+> void worker_thread_2() {
+>     shared_ops.get("key2");  // ✅ Safe to share
+> }
+> ```
+> 
+> **✅ PATTERN 2: Separate instances when different threads will be operating on different buckets**
+> ```cpp
+> void worker_thread1() {
+>     brpc::CouchbaseOperations ops_bucket1;
+>     ops_bucket1.authenticate(username, password, "server:11210", "bucket1");
+>     ops_bucket1.add("key1", "value1");
+> }
+> void worker_thread2() {
+>     brpc::CouchbaseOperations ops_bucket2;
+>     ops_bucket2.authenticate(username, password, "server:11210", "bucket2");
+>     ops_bucket2.add("key1", "value1"); 
+> }
+> ```
+> 
+> **✅ PATTERN 3: Separate instances when threads are operating on different servers.**
+> ```cpp
+> void worker_thread1() {
+>     brpc::CouchbaseOperations ops_bucket1;
+>     ops_server1.authenticate(username, password, "server1:11210", "bucket1");
+>     ops_server1.add("key1", "value1");
+> }
+> void worker_thread2() {
+>     brpc::CouchbaseOperations ops_server2;
+>     ops_server2.authenticate(username, password, "server2:11210", "bucket2");
+>     ops_server2.add("key1", "value1"); 
+> }
+> ```
+>
+> **For additional Couchbase features, consider the couchbase-cxx-SDK version of bRPC, which provides a more complete set of Couchbase features and can be accessed at [Couchbaselabs-cb-brpc](https://github.com/couchbaselabs/cb_brpc/tree/couchbase_sdk_brpc).**
+
+
+Contributions and issue reports are welcome!
diff --git a/example/couchbase_c++/Makefile b/example/couchbase_c++/Makefile
new file mode 100644
index 0000000..f41e4b6
--- /dev/null
+++ b/example/couchbase_c++/Makefile
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+BRPC_PATH = ../../
+include $(BRPC_PATH)/config.mk
+CXXFLAGS+=$(CPPFLAGS) -std=c++17 -DNDEBUG -O2 -pipe -W -Wall -fPIC -fno-omit-frame-pointer
+HDRS+=$(BRPC_PATH)/output/include
+LIBS+=$(BRPC_PATH)/output/lib
+HDRPATHS = $(addprefix -I, $(HDRS))
+LIBPATHS = $(addprefix -L, $(LIBS))
+COMMA=,
+SOPATHS=$(addprefix -Wl$(COMMA)-rpath$(COMMA), $(LIBS))
+
+# Define targets and their sources
+TARGETS = couchbase_client multithreaded_couchbase_client traditional_brpc_couchbase_client
+COUCHBASE_CLIENT_OBJS = couchbase_client.o
+MULTITHREADED_CLIENT_OBJS = multithreaded_couchbase_client.o
+TRADITIONAL_CLIENT_OBJS = traditional_brpc_couchbase_client.o
+ALL_OBJS = $(COUCHBASE_CLIENT_OBJS) $(MULTITHREADED_CLIENT_OBJS) $(TRADITIONAL_CLIENT_OBJS)
+
+ifeq ($(SYSTEM),Darwin)
+ ifneq ("$(LINK_SO)", "")
+	STATIC_LINKINGS += -lbrpc
+ else
+	# *.a must be explicitly specified in clang
+	STATIC_LINKINGS += $(BRPC_PATH)/output/lib/libbrpc.a
+ endif
+	LINK_OPTIONS_SO = $^ $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS)
+	LINK_OPTIONS = $^ $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS)
+else ifeq ($(SYSTEM),Linux)
+	STATIC_LINKINGS += -lbrpc
+	LINK_OPTIONS_SO = -Xlinker "-(" $^ -Xlinker "-)" $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS)
+	LINK_OPTIONS = -Xlinker "-(" $^ -Wl,-Bstatic $(STATIC_LINKINGS) -Wl,-Bdynamic -Xlinker "-)" $(DYNAMIC_LINKINGS)
+endif
+
+.PHONY: all clean couchbase_client multithreaded_couchbase_client help
+
+# Default target builds both clients
+all: $(TARGETS)
+
+clean:
+	@echo "> Cleaning"
+	rm -rf $(TARGETS) $(ALL_OBJS)
+
+# Build rules for individual targets
+couchbase_client: $(COUCHBASE_CLIENT_OBJS)
+	@echo "> Linking $@"
+ifneq ("$(LINK_SO)", "")
+	$(CXX) $(LIBPATHS) $(SOPATHS) $(LINK_OPTIONS_SO) -o $@
+else
+	$(CXX) $(LIBPATHS) $(LINK_OPTIONS) -o $@
+endif
+
+multithreaded_couchbase_client: $(MULTITHREADED_CLIENT_OBJS)
+	@echo "> Linking $@"
+ifneq ("$(LINK_SO)", "")
+	$(CXX) $(LIBPATHS) $(SOPATHS) $(LINK_OPTIONS_SO) -o $@
+else
+	$(CXX) $(LIBPATHS) $(LINK_OPTIONS) -o $@
+endif
+
+traditional_brpc_couchbase_client: $(TRADITIONAL_CLIENT_OBJS)
+	@echo "> Linking $@"
+ifneq ("$(LINK_SO)", "")
+	$(CXX) $(LIBPATHS) $(SOPATHS) $(LINK_OPTIONS_SO) -o $@
+else
+	$(CXX) $(LIBPATHS) $(LINK_OPTIONS) -o $@
+endif
+
+# Compilation rules
+couchbase_client.o: couchbase_client.cpp
+	@echo "> Compiling $@"
+	$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
+
+multithreaded_couchbase_client.o: multithreaded_couchbase_client.cpp
+	@echo "> Compiling $@"
+	$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
+
+traditional_brpc_couchbase_client.o: traditional_brpc_couchbase_client.cpp
+	@echo "> Compiling $@"
+	$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
\ No newline at end of file
diff --git a/example/couchbase_c++/couchbase_client.cpp b/example/couchbase_c++/couchbase_client.cpp
new file mode 100644
index 0000000..b1dc906
--- /dev/null
+++ b/example/couchbase_c++/couchbase_client.cpp
@@ -0,0 +1,451 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <brpc/couchbase.h>
+#include <butil/logging.h>
+#include <gflags/gflags.h>
+
+#include <iostream>
+#include <string>
+
+// ANSI color codes for console output
+#define GREEN "\033[32m"
+#define RED "\033[31m"
+#define RESET "\033[0m"
+
+DEFINE_string(server, "localhost:11210", "IP Address of server");
+int performOperations(brpc::CouchbaseOperations& couchbase_ops) {
+  std::string add_key = "user::test_brpc_binprot";
+  std::string add_value =
+      R"({"name": "John Doe", "age": 30, "email": "john@example.com"})";
+
+  brpc::CouchbaseOperations::Result add_result =
+      couchbase_ops.add(add_key, add_value);
+  if (add_result.success) {
+    std::cout << GREEN << "ADD operation successful" << RESET << std::endl;
+  } else {
+    std::cout << RED << "ADD operation failed: " << add_result.error_message
+              << RESET << std::endl;
+  }
+
+  // Try to ADD the same key again (should fail with key exists)
+  brpc::CouchbaseOperations::Result add_result2 =
+      couchbase_ops.add(add_key, add_value);
+  if (add_result2.success) {
+    std::cout << GREEN << "Second ADD operation unexpectedly successful"
+              << RESET << std::endl;
+  } else {
+    std::cout << RED << "Second ADD operation failed as expected: "
+              << add_result2.error_message << RESET << std::endl;
+  }
+  // Get operation using high-level method
+  brpc::CouchbaseOperations::Result get_result = couchbase_ops.get(add_key);
+  if (get_result.success) {
+    std::cout << GREEN << "GET operation successful" << RESET << std::endl;
+    std::cout << "GET value: " << get_result.value << std::endl;
+  } else {
+    std::cout << RED << "GET operation failed: " << get_result.error_message
+              << RESET << std::endl;
+  }
+
+  // Add binprot item1 using high-level method
+  std::string item1_key = "binprot_item1";
+  brpc::CouchbaseOperations::Result item1_result =
+      couchbase_ops.add(item1_key, add_value);
+  if (item1_result.success) {
+    std::cout << GREEN << "ADD binprot item1 successful" << RESET << std::endl;
+  } else {
+    std::cout << RED
+              << "ADD binprot item1 failed: " << item1_result.error_message
+              << RESET << std::endl;
+  }
+
+  // Add binprot item2 using high-level method
+  std::string item2_key = "binprot_item2";
+  brpc::CouchbaseOperations::Result item2_result =
+      couchbase_ops.add(item2_key, add_value);
+  if (item2_result.success) {
+    std::cout << GREEN << "ADD binprot item2 successful" << RESET << std::endl;
+  } else {
+    std::cout << RED
+              << "ADD binprot item2 failed: " << item2_result.error_message
+              << RESET << std::endl;
+  }
+
+  // Add binprot item3 using high-level method
+  std::string item3_key = "binprot_item3";
+  brpc::CouchbaseOperations::Result item3_result =
+      couchbase_ops.add(item3_key, add_value);
+  if (item3_result.success) {
+    std::cout << GREEN << "ADD binprot item3 successful" << RESET << std::endl;
+  } else {
+    std::cout << RED
+              << "ADD binprot item3 failed: " << item3_result.error_message
+              << RESET << std::endl;
+  }
+
+  // Perform an UPSERT on the existing key using high-level method
+  std::string upsert_key = "user::test_brpc_binprot";
+  std::string upsert_value =
+      R"({"name": "Upserted Jane Doe", "age": 28, "email": "upserted.doe@example.com"})";
+  brpc::CouchbaseOperations::Result upsert_result =
+      couchbase_ops.upsert(upsert_key, upsert_value);
+  if (upsert_result.success) {
+    std::cout
+        << GREEN
+        << "UPSERT operation successful when the document exists in the server"
+        << RESET << std::endl;
+  } else {
+    std::cout
+        << RED
+        << "UPSERT operation failed when the document exists in the server: "
+        << upsert_result.error_message << RESET << std::endl;
+  }
+  // Do UPSERT operation on a new document using high-level method
+  std::string new_upsert_key = "user::test_brpc_new_upsert";
+  std::string new_upsert_value =
+      R"({"name": "Jane Doe", "age": 28, "email": "jane.doe@example.com"})";
+  brpc::CouchbaseOperations::Result new_upsert_result =
+      couchbase_ops.upsert(new_upsert_key, new_upsert_value);
+  if (new_upsert_result.success) {
+    std::cout << GREEN
+              << "UPSERT operation successful when the document doesn't exist "
+                 "in the server"
+              << RESET << std::endl;
+  } else {
+    std::cout << RED
+              << "UPSERT operation failed when document does not exist in the "
+                 "server: "
+              << new_upsert_result.error_message << RESET << std::endl;
+  }
+
+  // Check the upserted data using high-level method
+  std::string check_key = "user::test_brpc_new_upsert";
+  brpc::CouchbaseOperations::Result check_result = couchbase_ops.get(check_key);
+  if (check_result.success) {
+    std::cout << GREEN << "GET after UPSERT operation successful - Value: "
+              << check_result.value << RESET << std::endl;
+  } else {
+    std::cout << RED << "GET after UPSERT operation failed: "
+              << check_result.error_message << RESET << std::endl;
+  }
+
+  // Delete a non-existent key using high-level method
+  std::string delete_key = "Nonexistent_key";
+  brpc::CouchbaseOperations::Result delete_result =
+      couchbase_ops.delete_(delete_key);
+  if (delete_result.success) {
+    std::cout << GREEN << "DELETE operation successful" << RESET << std::endl;
+  } else {
+    std::cout << RED << "DELETE operation failed: as expected "
+              << delete_result.error_message << RESET << std::endl;
+  }
+
+  // Delete the existing key using high-level method
+  std::string delete_existing_key = "user::test_brpc_binprot";
+  brpc::CouchbaseOperations::Result delete_existing_result =
+      couchbase_ops.delete_(delete_existing_key);
+  if (delete_existing_result.success) {
+    std::cout << GREEN << "DELETE operation successful" << RESET << std::endl;
+  } else {
+    std::cout << RED << "DELETE operation failed: "
+              << delete_existing_result.error_message << RESET << std::endl;
+  }
+
+  // Retrieve Collection ID for scope `_default` and collection
+  // `col1`
+  const std::string scope_name = "_default";  // default scope
+  std::string collection_name = "col1";       // target collection
+  // ------------------------------------------------------------------
+  // Collection-scoped CRUD operations (only if collection id was retrieved)
+  // ------------------------------------------------------------------
+  // 1. ADD in collection using high-level method
+  std::string coll_key = "user::collection_doc";
+  std::string coll_value = R"({"type":"collection","op":"add","v":1})";
+  brpc::CouchbaseOperations::Result coll_add_result =
+      couchbase_ops.add(coll_key, coll_value, collection_name);
+  if (coll_add_result.success) {
+    std::cout << GREEN << "Collection ADD success" << RESET << std::endl;
+  } else {
+    std::cout << RED
+              << "Collection ADD failed: " << coll_add_result.error_message
+              << RESET << std::endl;
+  }
+  // 2. GET from collection using high-level method
+  brpc::CouchbaseOperations::Result coll_get_result =
+      couchbase_ops.get(coll_key, collection_name);
+  if (coll_get_result.success) {
+    std::cout << GREEN
+              << "Collection GET success value=" << coll_get_result.value
+              << RESET << std::endl;
+  } else {
+    std::cout << RED
+              << "Collection GET failed: " << coll_get_result.error_message
+              << RESET << std::endl;
+  }
+
+  // 3. UPSERT in collection using high-level method
+  std::string coll_upsert_value =
+      R"({"type":"collection","op":"upsert","v":2})";
+  brpc::CouchbaseOperations::Result coll_upsert_result =
+      couchbase_ops.upsert(coll_key, coll_upsert_value, collection_name);
+  if (coll_upsert_result.success) {
+    std::cout << GREEN << "Collection UPSERT success" << RESET << std::endl;
+  } else {
+    std::cout << RED << "Collection UPSERT failed: "
+              << coll_upsert_result.error_message << RESET << std::endl;
+  }
+
+  // 4. GET again to verify upsert using high-level method
+  brpc::CouchbaseOperations::Result coll_get2_result =
+      couchbase_ops.get(coll_key, collection_name);
+  if (coll_get2_result.success) {
+    std::cout << GREEN
+              << "Collection GET(after upsert) value=" << coll_get2_result.value
+              << RESET << std::endl;
+  }
+
+  // 5. DELETE from collection using high-level method
+  brpc::CouchbaseOperations::Result coll_del_result =
+      couchbase_ops.delete_(coll_key, collection_name);
+  if (coll_del_result.success) {
+    std::cout << GREEN << "Collection DELETE success" << RESET << std::endl;
+  } else {
+    std::cout << RED
+              << "Collection DELETE failed: " << coll_del_result.error_message
+              << RESET << std::endl;
+  }
+
+  // ------------------------------------------------------------------
+  // Pipeline Operations Demo
+  // ------------------------------------------------------------------
+  std::cout << GREEN << "\n=== Pipeline Operations Demo ===" << RESET
+            << std::endl;
+
+  // Begin a new pipeline
+  if (!couchbase_ops.beginPipeline()) {
+    std::cout << RED << "Failed to begin pipeline" << RESET << std::endl;
+    return -1;
+  }
+
+  std::cout << "Pipeline started. Adding multiple operations..." << std::endl;
+
+  // Add multiple operations to the pipeline
+  std::string pipeline_key1 = "pipeline::doc1";
+  std::string pipeline_key2 = "pipeline::doc2";
+  std::string pipeline_key3 = "pipeline::doc3";
+  std::string pipeline_value1 = R"({"operation": "pipeline_add", "id": 1})";
+  std::string pipeline_value2 = R"({"operation": "pipeline_upsert", "id": 2})";
+  std::string pipeline_value3 = R"({"operation": "pipeline_add", "id": 3})";
+
+  // Pipeline operations - all prepared but not yet executed
+  bool pipeline_success = true;
+  pipeline_success &= couchbase_ops.pipelineRequest(
+      brpc::CouchbaseOperations::ADD, pipeline_key1, pipeline_value1);
+  pipeline_success &= couchbase_ops.pipelineRequest(
+      brpc::CouchbaseOperations::UPSERT, pipeline_key2, pipeline_value2);
+  pipeline_success &= couchbase_ops.pipelineRequest(
+      brpc::CouchbaseOperations::ADD, pipeline_key3, pipeline_value3);
+  pipeline_success &= couchbase_ops.pipelineRequest(
+      brpc::CouchbaseOperations::GET, pipeline_key1);
+  pipeline_success &= couchbase_ops.pipelineRequest(
+      brpc::CouchbaseOperations::GET, pipeline_key2);
+
+  if (!pipeline_success) {
+    std::cout << RED << "Failed to add operations to pipeline" << RESET
+              << std::endl;
+    couchbase_ops.clearPipeline();
+    return -1;
+  }
+
+  std::cout << "Added " << couchbase_ops.getPipelineSize()
+            << " operations to pipeline" << std::endl;
+
+  // Execute all operations in a single network call
+  std::cout << "Executing pipeline operations..." << std::endl;
+  std::vector<brpc::CouchbaseOperations::Result> pipeline_results =
+      couchbase_ops.executePipeline();
+
+  // Process results in order
+  std::cout << GREEN << "Pipeline execution completed. Results:" << RESET
+            << std::endl;
+  for (size_t i = 0; i < pipeline_results.size(); ++i) {
+    const auto& result = pipeline_results[i];
+    if (result.success) {
+      if (!result.value.empty()) {
+        std::cout << GREEN << "  Operation " << (i + 1)
+                  << " SUCCESS - Value: " << result.value << RESET << std::endl;
+      } else {
+        std::cout << GREEN << "  Operation " << (i + 1) << " SUCCESS" << RESET
+                  << std::endl;
+      }
+    } else {
+      std::cout << RED << "  Operation " << (i + 1)
+                << " FAILED: " << result.error_message << RESET << std::endl;
+    }
+  }
+
+  // Demonstrate pipeline with collection operations
+  std::cout << GREEN << "\n=== Pipeline with Collection Operations ===" << RESET
+            << std::endl;
+
+  if (!couchbase_ops.beginPipeline()) {
+    std::cout << RED << "Failed to begin collection pipeline" << RESET
+              << std::endl;
+    return -1;
+  }
+
+  std::string coll_pipeline_key1 = "coll_pipeline::doc1";
+  std::string coll_pipeline_key2 = "coll_pipeline::doc2";
+  std::string coll_pipeline_value1 =
+      R"({"collection_operation": "pipeline_add", "id": 1})";
+  std::string coll_pipeline_value2 =
+      R"({"collection_operation": "pipeline_upsert", "id": 2})";
+
+  // Add collection-scoped operations to pipeline
+  bool coll_pipeline_success = true;
+  coll_pipeline_success &= couchbase_ops.pipelineRequest(
+      brpc::CouchbaseOperations::ADD, coll_pipeline_key1, coll_pipeline_value1,
+      collection_name);
+  coll_pipeline_success &= couchbase_ops.pipelineRequest(
+      brpc::CouchbaseOperations::UPSERT, coll_pipeline_key2,
+      coll_pipeline_value2, collection_name);
+  coll_pipeline_success &= couchbase_ops.pipelineRequest(
+      brpc::CouchbaseOperations::GET, coll_pipeline_key1, "", collection_name);
+  coll_pipeline_success &=
+      couchbase_ops.pipelineRequest(brpc::CouchbaseOperations::DELETE,
+                                    coll_pipeline_key1, "", collection_name);
+
+  if (!coll_pipeline_success) {
+    std::cout << RED << "Failed to add collection operations to pipeline"
+              << RESET << std::endl;
+    couchbase_ops.clearPipeline();
+    return -1;
+  }
+
+  // Execute collection pipeline
+  std::vector<brpc::CouchbaseOperations::Result> coll_pipeline_results =
+      couchbase_ops.executePipeline();
+
+  std::cout << GREEN
+            << "Collection pipeline execution completed. Results:" << RESET
+            << std::endl;
+  for (size_t i = 0; i < coll_pipeline_results.size(); ++i) {
+    const auto& result = coll_pipeline_results[i];
+    if (result.success) {
+      if (!result.value.empty()) {
+        std::cout << GREEN << "  Collection Operation " << (i + 1)
+                  << " SUCCESS - Value: " << result.value << RESET << std::endl;
+      } else {
+        std::cout << GREEN << "  Collection Operation " << (i + 1) << " SUCCESS"
+                  << RESET << std::endl;
+      }
+    } else {
+      std::cout << RED << "  Collection Operation " << (i + 1)
+                << " FAILED: " << result.error_message << RESET << std::endl;
+    }
+  }
+
+  // Clean up remaining pipeline documents
+  std::cout << GREEN << "\n=== Cleanup Pipeline Demo ===" << RESET << std::endl;
+  if (couchbase_ops.beginPipeline()) {
+    couchbase_ops.pipelineRequest(brpc::CouchbaseOperations::DELETE,
+                                  pipeline_key1);
+    couchbase_ops.pipelineRequest(brpc::CouchbaseOperations::DELETE,
+                                  pipeline_key2);
+    couchbase_ops.pipelineRequest(brpc::CouchbaseOperations::DELETE,
+                                  pipeline_key3);
+    couchbase_ops.pipelineRequest(brpc::CouchbaseOperations::DELETE,
+                                  coll_pipeline_key2, "", collection_name);
+
+    std::vector<brpc::CouchbaseOperations::Result> cleanup_results =
+        couchbase_ops.executePipeline();
+    std::cout << "Cleanup completed (" << cleanup_results.size()
+              << " operations)" << std::endl;
+  }
+
+  std::cout << GREEN
+            << "\n=== All operations completed successfully! ===" << RESET
+            << std::endl;
+}
+int main() {
+  // Create CouchbaseOperations instance for high-level operations
+  brpc::CouchbaseOperations couchbase_ops;
+
+  // std::cout << GREEN << "Using high-level CouchbaseOperations interface"
+  //           << RESET << std::endl;
+
+  // Ask username and password for authentication
+  std::string username = "Administrator";
+  std::string password = "password";
+  while (username.empty() || password.empty()) {
+    std::cout << "Enter Couchbase username: ";
+    std::cin >> username;
+    if (username.empty()) {
+      std::cout << "Username cannot be empty. Please enter again." << std::endl;
+      continue;
+    }
+    std::cout << "Enter Couchbase password: ";
+    std::cin >> password;
+    if (password.empty()) {
+      std::cout << "Password cannot be empty. Please enter again." << std::endl;
+      continue;
+    }
+  }
+
+  // Use high-level authentication method
+  // when connecting to capella use couchbase_ops.authenticate(username,
+  // password, FLAGS_server, true, "path/to/cert.txt");
+  brpc::CouchbaseOperations::Result auth_result =
+      couchbase_ops.authenticate(username, password, FLAGS_server, "testing");
+  if (!auth_result.success) {
+    LOG(ERROR) << "Authentication failed: " << auth_result.error_message;
+    return -1;
+  }
+
+  std::cout
+      << GREEN
+      << "Authentication successful, proceeding with Couchbase operations..."
+      << RESET << std::endl;
+
+  performOperations(couchbase_ops);
+
+  // Change bucket Selection
+  std::string bucket_name = "testing";
+  while (bucket_name.empty()) {
+    std::cout << "Enter Couchbase bucket name: ";
+    std::cin >> bucket_name;
+    if (bucket_name.empty()) {
+      std::cout << "Bucket name cannot be empty. Please enter again."
+                << std::endl;
+      continue;
+    }
+  }
+
+  // Use high-level bucket selection method
+  brpc::CouchbaseOperations::Result bucket_result =
+      couchbase_ops.selectBucket(bucket_name);
+  if (!bucket_result.success) {
+    LOG(ERROR) << "Bucket selection failed: " << bucket_result.error_message;
+    return -1;
+  } else {
+    std::cout << GREEN << "Bucket Selection Successful" << RESET << std::endl;
+  }
+  // Add operation using high-level method
+  performOperations(couchbase_ops);
+  return 0;
+}
diff --git a/example/couchbase_c++/multithreaded_couchbase_client.cpp b/example/couchbase_c++/multithreaded_couchbase_client.cpp
new file mode 100644
index 0000000..d6dd9e5
--- /dev/null
+++ b/example/couchbase_c++/multithreaded_couchbase_client.cpp
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <brpc/couchbase.h>
+#include <bthread/bthread.h>
+#include <butil/logging.h>
+#include <butil/string_printf.h>
+
+#include <atomic>
+#include <iostream>
+#include <string>
+#include <vector>
+
+// ANSI color codes
+#define GREEN "\033[32m"
+#define RED "\033[31m"
+#define BLUE "\033[34m"
+#define YELLOW "\033[33m"
+#define CYAN "\033[36m"
+#define RESET "\033[0m"
+
+const int NUM_THREADS = 20;
+const int THREADS_PER_BUCKET = 5;
+
+// Simple global config
+struct {
+  std::string username = "Administrator";
+  std::string password = "password";
+  std::vector<std::string> bucket_names = {"t0", "t1", "t2", "t3"};
+} g_config;
+
+// Simple thread statistics
+struct ThreadStats {
+  std::atomic<int> operations_attempted{0};
+  std::atomic<int> operations_successful{0};
+  std::atomic<int> operations_failed{0};
+
+  void reset() {
+    operations_attempted = 0;
+    operations_successful = 0;
+    operations_failed = 0;
+  }
+};
+
+// Global statistics
+struct GlobalStats {
+  ThreadStats total;
+  std::vector<ThreadStats> per_thread_stats;
+
+  GlobalStats() : per_thread_stats(NUM_THREADS) {}
+
+  void aggregate_stats() {
+    total.reset();
+    for (const auto& stats : per_thread_stats) {
+      total.operations_attempted += stats.operations_attempted.load();
+      total.operations_successful += stats.operations_successful.load();
+      total.operations_failed += stats.operations_failed.load();
+    }
+  }
+} g_stats;
+
+// Simple thread arguments
+struct ThreadArgs {
+  int thread_id;
+  int bucket_id;
+  std::string bucket_name;
+  brpc::CouchbaseOperations* couchbase_ops;
+  ThreadStats* stats;
+};
+
+// Simple CRUD operations on default collection
+void perform_crud_operations_default(brpc::CouchbaseOperations& couchbase_ops,
+                                     const std::string& base_key,
+                                     ThreadStats* stats) {
+  std::string key = base_key + "_default";
+  std::string value = butil::string_printf(
+      R"({"thread_id": %d, "collection": "default"})", (int)bthread_self());
+
+  stats->operations_attempted++;
+
+  // UPSERT
+  brpc::CouchbaseOperations::Result result = couchbase_ops.upsert(key, value);
+  if (result.success) {
+    stats->operations_successful++;
+  } else {
+    stats->operations_failed++;
+    return;
+  }
+
+  stats->operations_attempted++;
+
+  // GET
+  result = couchbase_ops.get(key);
+  if (result.success) {
+    stats->operations_successful++;
+  } else {
+    stats->operations_failed++;
+    return;
+  }
+
+  stats->operations_attempted++;
+
+  // DELETE
+  result = couchbase_ops.delete_(key);
+  if (result.success) {
+    stats->operations_successful++;
+  } else {
+    stats->operations_failed++;
+  }
+}
+
+// Simple CRUD operations on col1 collection
+void perform_crud_operations_col1(brpc::CouchbaseOperations& couchbase_ops,
+                                  const std::string& base_key,
+                                  ThreadStats* stats) {
+  std::string key = base_key + "_col1";
+  std::string value = butil::string_printf(
+      R"({"thread_id": %d, "collection": "col1"})", (int)bthread_self());
+
+  stats->operations_attempted++;
+
+  // UPSERT
+  brpc::CouchbaseOperations::Result result =
+      couchbase_ops.upsert(key, value, "col1");
+  if (result.success) {
+    stats->operations_successful++;
+  } else {
+    stats->operations_failed++;
+    std::cout << "UPSERT failed: " << result.error_message << std::endl;
+    return;
+  }
+
+  stats->operations_attempted++;
+
+  // GET
+  result = couchbase_ops.get(key, "col1");
+  if (result.success) {
+    stats->operations_successful++;
+  } else {
+    stats->operations_failed++;
+    std::cout << "GET failed: " << result.error_message << std::endl;
+    return;
+  }
+
+  stats->operations_attempted++;
+
+  // DELETE
+  result = couchbase_ops.delete_(key, "col1");
+  if (result.success) {
+    stats->operations_successful++;
+  } else {
+    stats->operations_failed++;
+  }
+}
+
+// Simple thread worker function
+void* thread_worker(void* arg) {
+  ThreadArgs* args = static_cast<ThreadArgs*>(arg);
+
+  std::cout << CYAN << "Thread " << args->thread_id << " starting on bucket "
+            << args->bucket_name << RESET << std::endl;
+
+  // Create CouchbaseOperations instance for this thread
+  brpc::CouchbaseOperations couchbase_ops;
+
+  // Authentication
+  brpc::CouchbaseOperations::Result auth_result = couchbase_ops.authenticate(
+      g_config.username, g_config.password, "127.0.0.1:11210", args->bucket_name);
+  // for SSL authentication use below line instead
+  // brpc::CouchbaseOperations::Result auth_result = couchbase_ops.authenticateSSL(username, password, "127.0.0.1:11210", args->bucket_name, "/path/to/cert.txt");
+  
+  if (!auth_result.success) {
+    std::cout << RED << "Thread " << args->thread_id << ": Auth failed - "
+              << auth_result.error_message << RESET << std::endl;
+    return NULL;
+  }
+
+    // // Select bucket
+    // brpc::CouchbaseOperations::Result bucket_result =
+    //     couchbase_ops.selectBucket(args->bucket_name);
+
+    // if (!bucket_result.success) {
+    //   std::cout << RED << "Thread " << args->thread_id
+    //             << ": Bucket selection failed - " << bucket_result.error_message
+    //             << RESET << std::endl;
+    //   return NULL;
+    // }
+
+  std::cout << GREEN << "Thread " << args->thread_id << " connected to bucket "
+            << args->bucket_name << RESET << std::endl;
+
+  // Perform operations - 10 times on default collection, 10 times on col1
+  // collection
+  for (int i = 0; i < 10; ++i) {
+    std::string base_key =
+        butil::string_printf("thread_%d_op_%d", args->thread_id, i);
+
+    // CRUD operations on default collection
+    perform_crud_operations_default(couchbase_ops, base_key, args->stats);
+
+    // CRUD operations on col1 collection
+    perform_crud_operations_col1(couchbase_ops, base_key, args->stats);
+
+    // Small delay between operations
+    bthread_usleep(10000);  // 10ms
+  }
+
+  int successful = args->stats->operations_successful.load();
+  int attempted = args->stats->operations_attempted.load();
+  int failed = args->stats->operations_failed.load();
+
+  std::cout << GREEN << "Thread " << args->thread_id
+            << " completed: " << successful << "/" << attempted
+            << " operations successful, " << failed << " failed" << RESET
+            << std::endl;
+
+  return NULL;
+}
+
+void* shared_object_thread_worker(void *arg){
+    ThreadArgs* shared_args = static_cast<ThreadArgs*>(arg);
+    brpc::CouchbaseOperations* shared_couchbase_ops = shared_args->couchbase_ops;
+    // Perform operations - 10 times on default collection, 10 times on col1
+    // collection
+    for (int i = 0; i < 10; ++i) {
+        std::string base_key =
+            butil::string_printf("shared_thread_op_%d_thread_id_%d", i, shared_args->thread_id);
+        // CRUD operations on default collection
+        perform_crud_operations_default(*shared_couchbase_ops, base_key, shared_args->stats);
+        // CRUD operations on col1 collection
+        perform_crud_operations_col1(*shared_couchbase_ops, base_key, shared_args->stats);
+        // Small delay between operations
+        bthread_usleep(10000);  // 10ms
+    }
+    return NULL;
+}
+
+// Print simple statistics
+void print_stats() {
+  g_stats.aggregate_stats();
+
+  std::cout << std::endl;
+  std::cout << CYAN << "=== TEST RESULTS ===" << RESET << std::endl;
+
+  int total_attempted = g_stats.total.operations_attempted.load();
+  int total_successful = g_stats.total.operations_successful.load();
+  int total_failed = g_stats.total.operations_failed.load();
+
+  double success_rate = total_attempted > 0
+                            ? (double)total_successful / total_attempted * 100.0
+                            : 0.0;
+
+  std::cout << GREEN << "Overall Performance:" << RESET << std::endl;
+  std::cout << "  Total Operations: " << total_attempted << std::endl;
+  std::cout << "  Successful: " << total_successful << " (" << success_rate
+            << "%)" << std::endl;
+  std::cout << "  Failed: " << total_failed << std::endl;
+  std::cout << std::endl;
+
+  // Per-thread breakdown
+  std::cout << YELLOW << "Per-Thread Performance:" << RESET << std::endl;
+  for (int i = 0; i < NUM_THREADS; ++i) {
+    const auto& stats = g_stats.per_thread_stats[i];
+    int attempted = stats.operations_attempted.load();
+    int successful = stats.operations_successful.load();
+    int failed = stats.operations_failed.load();
+
+    std::cout << "  Thread " << i << ": " << attempted << " ops, " << successful
+              << " success, " << failed << " failed" << std::endl;
+  }
+  std::cout << std::endl;
+}
+
+int main(int argc, char* argv[]) {
+  std::cout << GREEN << "Starting Simple Multithreaded Couchbase Client"
+            << RESET << std::endl;
+  std::cout
+      << YELLOW
+      << "20 threads: 5 per bucket (testing0, testing1, testing2, testing3)"
+      << RESET << std::endl;
+  std::cout << BLUE
+            << "Each thread performs CRUD operations on default collection and "
+               "col1 collection"
+            << RESET << std::endl;
+
+  // Create threads and arguments
+  std::vector<bthread_t> threads(NUM_THREADS);
+  std::vector<ThreadArgs> args(NUM_THREADS);
+
+  // Assign threads to buckets
+  for (int i = 0; i < NUM_THREADS; ++i) {
+    args[i].thread_id = i;
+    args[i].bucket_id = i / THREADS_PER_BUCKET;
+    args[i].bucket_name = g_config.bucket_names[args[i].bucket_id];
+    args[i].stats = &g_stats.per_thread_stats[i];
+  }
+
+  // Print thread assignments
+  std::cout << "Thread Assignments:" << RESET << std::endl;
+  for (int i = 0; i < NUM_THREADS; ++i) {
+    std::cout << "Thread " << i << " -> Bucket: " << args[i].bucket_name
+              << std::endl;
+  }
+  std::cout << std::endl;
+
+  // Start all threads
+  for (int i = 0; i < NUM_THREADS; ++i) {
+    if (bthread_start_background(&threads[i], NULL, thread_worker, &args[i]) !=
+        0) {
+      std::cout << RED << "Failed to create thread " << i << RESET << std::endl;
+      return -1;
+    }
+  }
+
+  std::cout << GREEN << "All 20 threads started!" << RESET << std::endl;
+
+  // Wait for all threads to complete
+  std::cout << YELLOW << "Waiting for all threads to complete..." << RESET
+            << std::endl;
+  for (int i = 0; i < NUM_THREADS; ++i) {
+    bthread_join(threads[i], NULL);
+  }
+
+  std::cout << GREEN << "All threads completed!" << RESET << std::endl;
+  // create a shared CouchbaseOperations instance
+  brpc::CouchbaseOperations shared_couchbase_ops;
+  brpc::CouchbaseOperations::Result result;
+  result = shared_couchbase_ops.authenticate(
+      g_config.username, g_config.password, "127.0.0.1:11210", "t0");
+  if(result.success){
+      std::cout << GREEN << "Shared CouchbaseOperations instance authenticated successfully!" << RESET << std::endl;
+  } else {
+      std::cout << RED << "Shared CouchbaseOperations instance authentication failed: " << result.error_message << RESET << std::endl;
+      return -1;
+  }
+
+  for (int i = 0; i < NUM_THREADS; ++i) {
+    args[i].thread_id = i;
+    args[i].couchbase_ops = &shared_couchbase_ops;
+    args[i].bucket_id = 0;
+    args[i].bucket_name = "t0";
+    // args[i].stats = &g_stats.per_thread_stats[i];
+  }
+
+  for(int i = 0; i < NUM_THREADS; ++i){
+      if (bthread_start_background(&threads[i], NULL, shared_object_thread_worker, &args[i]) !=
+        0) {
+      std::cout << RED << "Failed to create shared object thread " << i << RESET << std::endl;
+      return -1;
+    }
+  }
+  for(int i = 0; i < NUM_THREADS; ++i){
+      bthread_join(threads[i], NULL);
+  }
+  std::cout << GREEN << "All shared object threads completed!" << RESET << std::endl;
+
+  // Print statistics
+  print_stats();
+
+  return 0;
+}
diff --git a/example/couchbase_c++/traditional_brpc_couchbase_client.cpp b/example/couchbase_c++/traditional_brpc_couchbase_client.cpp
new file mode 100644
index 0000000..c63c98c
--- /dev/null
+++ b/example/couchbase_c++/traditional_brpc_couchbase_client.cpp
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <brpc/couchbase.h>
+
+#include <iostream>
+#include <string>
+
+// ANSI color codes for console output
+#define GREEN "\033[32m"
+#define RED "\033[31m"
+#define RESET "\033[0m"
+
+int main() {
+  // traditional bRPC Couchbase client
+  brpc::Channel channel;
+  brpc::ChannelOptions options;
+  options.protocol = brpc::PROTOCOL_COUCHBASE;
+  options.connection_type = "single";
+  options.timeout_ms = 1000;  // 1 second
+  options.max_retry = 3;
+  if (channel.Init("localhost:11210", &options) != 0) {
+    LOG(ERROR) << "Failed to initialize channel";
+    return -1;
+  }
+  brpc::Controller cntl;
+  brpc::CouchbaseOperations::CouchbaseRequest req;
+  brpc::CouchbaseOperations::CouchbaseResponse res;
+  uint64_t cas;
+  req.authenticateRequest("Administrator", "password");
+  channel.CallMethod(NULL, &cntl, &req, &res, NULL);
+  if (cntl.Failed()) {
+    LOG(ERROR) << "Unable to authenticate: Something went wrong"
+               << cntl.ErrorText();
+    return -1;
+  } else {
+    if (res.popHello(&cas) && res.popAuthenticate(&cas)) {
+      std::cout << "Traditional bRPC Couchbase Client Authentication Successful"
+                << std::endl;
+    } else {
+      std::cout << "Client Authentication Failed with status code: " << std::hex
+                << res._status_code << std::endl;
+      return -1;
+    }
+  }
+  cntl.Reset();
+  // clearing request and response
+
+  req.Clear();
+  res.Clear();
+  req.selectBucketRequest("testing");
+  channel.CallMethod(NULL, &cntl, &req, &res, NULL);
+  if (cntl.Failed()) {
+    LOG(ERROR) << "Unable to select bucket: Something went wrong"
+               << cntl.ErrorText();
+    return -1;
+  } else {
+    if (res.popSelectBucket(&cas)) {
+      std::cout
+          << "Traditional bRPC Couchbase Client Bucket Selection Successful"
+          << std::endl;
+    } else {
+      // the status code will be updated only after you do
+      // popFunctionName(param).
+      std::cout << "Traditional bRPC Couchbase Client Bucket Selection Failed "
+                   "with status code: "
+                << std::hex << res._status_code << std::endl;
+      std::cout << "Error Message: " << res.lastError() << std::endl;
+      return -1;
+    }
+  }
+  cntl.Reset();
+  // clearing request and response
+
+  req.Clear();
+  res.Clear();
+  req.addRequest(
+      "sample_key",
+      R"({"name": "John Doe", "age": 30, "email": "john@example.com"})",
+      0 /*flags*/, 0 /*exptime*/, 0 /*cas*/);
+  channel.CallMethod(NULL, &cntl, &req, &res, NULL);
+  if (cntl.Failed()) {
+    LOG(ERROR) << "Unable to add key-value: Something went wrong"
+               << cntl.ErrorText();
+    return -1;
+  } else {
+    if (res.popAdd(&cas)) {
+      std::cout
+          << "Traditional bRPC Couchbase Client Key-Value Addition Successful"
+          << std::endl;
+    } else {
+      // the status code will be updated only after you do
+      // popFunctionName(param).
+      std::cout << "Traditional bRPC Couchbase Client Key-Value Addition "
+                   "Failed with status code: "
+                << std::hex << res._status_code << std::endl;
+      std::cout << "Error Message: " << res.lastError() << std::endl;
+      return -1;
+    }
+  }
+  cntl.Reset();
+
+  // clearing request and response before doing a getRequest
+  req.Clear();
+  res.Clear();
+  req.getRequest("sample_key");
+  channel.CallMethod(NULL, &cntl, &req, &res, NULL);
+  if (cntl.Failed()) {
+    LOG(ERROR) << "Unable to get value for key: Something went wrong"
+               << cntl.ErrorText();
+    return -1;
+  } else {
+    std::string value;
+    uint32_t flags;
+    if (res.popGet(&value, &flags, &cas)) {
+      std::cout
+          << "Traditional bRPC Couchbase Client Key-Value Retrieval Successful"
+          << std::endl;
+      std::cout << "Retrieved Value: " << value << std::endl;
+    } else {
+      // note the status code will be updated only after you do
+      //  popFunctionName(param).
+      std::cout << "Traditional bRPC Couchbase Client Key-Value Retrieval "
+                   "Failed with status code: "
+                << std::hex << res._status_code << std::endl;
+      std::cout << "Error Message: " << res.lastError() << std::endl;
+      return -1;
+    }
+  }
+  cntl.Reset();
+  // clearing request and response before doing a deleteRequest
+
+  req.Clear();
+  res.Clear();
+  req.deleteRequest("sample_key");
+  channel.CallMethod(NULL, &cntl, &req, &res, NULL);
+  if (cntl.Failed()) {
+    LOG(ERROR) << "Unable to delete key-value: Something went wrong"
+               << cntl.ErrorText();
+    return -1;
+  } else {
+    if (res.popDelete()) {
+      std::cout
+          << "Traditional bRPC Couchbase Client Key-Value Deletion Successful"
+          << std::endl;
+    } else {
+      // the status code will be updated only after you do
+      // popFunctionName(param).
+      std::cout << "Traditional bRPC Couchbase Client Key-Value Deletion "
+                   "Failed with status code: "
+                << std::hex << res._status_code << std::endl;
+      std::cout << "Error Message: " << res.lastError() << std::endl;
+      return -1;
+    }
+  }
+  return 0;
+}
\ No newline at end of file
diff --git a/src/brpc/couchbase.cpp b/src/brpc/couchbase.cpp
new file mode 100644
index 0000000..52e16dc
--- /dev/null
+++ b/src/brpc/couchbase.cpp
@@ -0,0 +1,2634 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/couchbase.h"
+
+#include <zlib.h>  //for crc32 Vbucket_id
+
+// Debug flag for enabling debug statements
+static bool DBUG = false;  // Set to true to enable debug logs
+
+// Debug print macro
+#define DEBUG_PRINT(msg)                           \
+  do {                                             \
+    if (DBUG) {                                    \
+      std::cout << "[DEBUG] " << msg << std::endl; \
+    }                                              \
+  } while (0)
+
+#include <iostream>
+
+#include "brpc/policy/couchbase_protocol.h"
+#include "brpc/proto_base.pb.h"
+#include "butil/logging.h"
+#include "butil/macros.h"
+#include "butil/string_printf.h"
+#include "butil/sys_byteorder.h"
+#include "butil/third_party/rapidjson/document.h"
+#include "butil/third_party/rapidjson/rapidjson.h"
+
+namespace brpc {
+
+// Couchbase protocol constants
+namespace {
+[[maybe_unused]] constexpr uint32_t APPLE_VBUCKET_COUNT = 64;
+constexpr uint32_t DEFAULT_VBUCKET_COUNT = 1024;
+constexpr int CONNECTION_ID_SIZE = 33;
+constexpr size_t RANDOM_ID_HEX_SIZE = 67;  // 33 bytes * 2 + null terminator
+}  // namespace
+
+// Static member definitions
+CouchbaseManifestManager*
+    CouchbaseOperations::CouchbaseRequest::metadata_tracking =
+        &common_metadata_tracking;
+
+bool brpc::CouchbaseManifestManager::setBucketToCollectionManifest(
+    std::string server, std::string bucket,
+    CouchbaseManifestManager::CollectionManifest manifest) {
+  // Then update the collection manifest with proper locking
+  {
+    UniqueLock write_lock(rw_bucket_to_collection_manifest_mutex_);
+    bucket_to_collection_manifest_[server][bucket] = manifest;
+  }
+
+  return true;
+}
+
+bool brpc::CouchbaseManifestManager::getBucketToCollectionManifest(
+    std::string server, std::string bucket,
+    CouchbaseManifestManager::CollectionManifest* manifest) {
+  SharedLock read_lock(rw_bucket_to_collection_manifest_mutex_);
+  auto it1 = bucket_to_collection_manifest_.find(server);
+  if (it1 == bucket_to_collection_manifest_.end()) {
+    return false;
+  }
+  auto it2 = it1->second.find(bucket);
+  if (it2 == it1->second.end()) {
+    return false;
+  }
+  *manifest = it2->second;
+  return true;
+}
+
+bool brpc::CouchbaseManifestManager::getManifestToCollectionId(
+    CouchbaseManifestManager::CollectionManifest* manifest, std::string scope,
+    std::string collection, uint8_t* collection_id) {
+  if (manifest == nullptr || collection_id == nullptr) {
+    DEBUG_PRINT("Invalid input: manifest or collection_id is null");
+    return false;
+  }
+  auto it1 = manifest->scope_to_collection_id_map.find(scope);
+  if (it1 == manifest->scope_to_collection_id_map.end()) {
+    DEBUG_PRINT("Scope: " << scope << " not found in manifest");
+    return false;
+  }
+  auto it2 = it1->second.find(collection);
+  if (it2 == it1->second.end()) {
+    DEBUG_PRINT("Collection: " << collection
+                               << " not found in scope: " << scope);
+    return false;
+  }
+  *collection_id = it2->second;
+  return true;
+}
+
+bool CouchbaseManifestManager::jsonToCollectionManifest(
+    const std::string& json,
+    CouchbaseManifestManager::CollectionManifest* manifest) {
+  if (manifest == nullptr) {
+    DEBUG_PRINT("Invalid input: manifest is null");
+    return false;
+  }
+
+  // Clear existing data
+  manifest->uid.clear();
+  manifest->scope_to_collection_id_map.clear();
+
+  if (json.empty()) {
+    DEBUG_PRINT("JSON std::string is empty");
+    return false;
+  }
+
+  // Parse JSON using RapidJSON
+  BUTIL_RAPIDJSON_NAMESPACE::Document document;
+  document.Parse(json.c_str());
+
+  if (document.HasParseError()) {
+    DEBUG_PRINT("Failed to parse JSON: " << document.GetParseError());
+    return false;
+  }
+
+  if (!document.IsObject()) {
+    DEBUG_PRINT("JSON root is not an object");
+    return false;
+  }
+
+  // Extract uid
+  if (document.HasMember("uid") && document["uid"].IsString()) {
+    manifest->uid = document["uid"].GetString();
+  } else {
+    DEBUG_PRINT("Missing or invalid 'uid' field in JSON");
+    return false;
+  }
+
+  // Extract scopes
+  if (!document.HasMember("scopes") || !document["scopes"].IsArray()) {
+    DEBUG_PRINT("Missing or invalid 'scopes' field in JSON");
+    return false;
+  }
+
+  const BUTIL_RAPIDJSON_NAMESPACE::Value& scopes = document["scopes"];
+  for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < scopes.Size(); ++i) {
+    const BUTIL_RAPIDJSON_NAMESPACE::Value& scope = scopes[i];
+
+    if (!scope.IsObject()) {
+      DEBUG_PRINT("Scope at index " << i << " is not an object");
+      return false;
+    }
+
+    // Extract scope name
+    if (!scope.HasMember("name") || !scope["name"].IsString()) {
+      DEBUG_PRINT("Missing or invalid 'name' field in scope at index " << i);
+      return false;
+    }
+    std::string scope_name = scope["name"].GetString();
+
+    // Extract collections
+    if (!scope.HasMember("collections") || !scope["collections"].IsArray()) {
+      DEBUG_PRINT("Missing or invalid 'collections' field in scope '"
+                  << scope_name << "'");
+      return false;
+    }
+
+    const BUTIL_RAPIDJSON_NAMESPACE::Value& collections = scope["collections"];
+    std:: unordered_map<std::string, uint8_t> collection_map;
+
+    for (BUTIL_RAPIDJSON_NAMESPACE::SizeType j = 0; j < collections.Size();
+         ++j) {
+      const BUTIL_RAPIDJSON_NAMESPACE::Value& collection = collections[j];
+
+      if (!collection.IsObject()) {
+        DEBUG_PRINT("Collection at index " << j << " in scope '" << scope_name
+                                           << "' is not an object");
+        return false;
+      }
+
+      // Extract collection name
+      if (!collection.HasMember("name") || !collection["name"].IsString()) {
+        DEBUG_PRINT("Missing or invalid 'name' field in collection at index "
+                    << j << " in scope '" << scope_name << "'");
+        return false;
+      }
+      std::string collection_name = collection["name"].GetString();
+
+      // Extract collection uid (hex std::string)
+      if (!collection.HasMember("uid") || !collection["uid"].IsString()) {
+        DEBUG_PRINT("Missing or invalid 'uid' field in collection '"
+                    << collection_name << "' in scope '" << scope_name << "'");
+        return false;
+      }
+      std::string collection_uid_str = collection["uid"].GetString();
+
+      // Convert hex std::string to uint8_t
+      uint8_t collection_id = 0;
+      try {
+        // Convert hex std::string to integer
+        unsigned long uid_val = std::stoul(collection_uid_str, nullptr, 16);
+        if (uid_val > 255) {
+          DEBUG_PRINT(
+              "Collection uid '"
+              << collection_uid_str << "' exceeds uint8_t range in collection '"
+              << collection_name << "' in scope '" << scope_name << "'");
+          return false;
+        }
+        collection_id = static_cast<uint8_t>(uid_val);
+      } catch (const std::exception& e) {
+        DEBUG_PRINT("Failed to parse collection uid '"
+                    << collection_uid_str << "' as hex in collection '"
+                    << collection_name << "' in scope '" << scope_name << ": "
+                    << e.what());
+        return false;
+      }
+
+      // Add to collection map
+      collection_map[collection_name] = collection_id;
+    }
+
+    // Add scope and its collections to manifest
+    manifest->scope_to_collection_id_map[scope_name] =
+        std::move(collection_map);
+  }
+
+  return true;
+}
+
+bool CouchbaseManifestManager::refreshCollectionManifest(
+    brpc::Channel* channel, const std::string& server, const std::string& bucket,
+    std:: unordered_map<std::string, CollectionManifest>*
+        local_collection_manifest_cache) {
+  // first fetch the manifest
+  // then compare the UID with the cached one
+  if (channel == nullptr) {
+    DEBUG_PRINT("No channel found, make sure to call Authenticate() first");
+    return false;
+  }
+  if (server.empty()) {
+    DEBUG_PRINT("Server is empty, make sure to call Authenticate() first");
+    return false;
+  }
+  if (bucket.empty()) {
+    DEBUG_PRINT("No bucket selected, make sure to call SelectBucket() first");
+    return false;
+  }
+  CouchbaseOperations::CouchbaseRequest temp_get_manifest_request;
+  CouchbaseOperations::CouchbaseResponse temp_get_manifest_response;
+  brpc::Controller temp_cntl;
+  temp_get_manifest_request.getCollectionManifest();
+  channel->CallMethod(NULL, &temp_cntl, &temp_get_manifest_request,
+                      &temp_get_manifest_response, NULL);
+  if (temp_cntl.Failed()) {
+    DEBUG_PRINT("Failed to get collection manifest: bRPC controller error "
+                << temp_cntl.ErrorText());
+    return false;
+  }
+  std::string manifest_json;
+  if (!temp_get_manifest_response.popManifest(&manifest_json)) {
+    DEBUG_PRINT("Failed to parse response for refreshing collection Manifest: "
+                << temp_get_manifest_response.lastError());
+    return false;
+  }
+  brpc::CouchbaseManifestManager::CollectionManifest manifest;
+  if (!common_metadata_tracking.jsonToCollectionManifest(manifest_json,
+                                                         &manifest)) {
+    DEBUG_PRINT("Failed to parse collection manifest JSON");
+    return false;
+  }
+  brpc::CouchbaseManifestManager::CollectionManifest cached_manifest;
+  if (!common_metadata_tracking.getBucketToCollectionManifest(
+          server, bucket, &cached_manifest)) {
+    // No cached manifest found, set the new one
+    if (!common_metadata_tracking.setBucketToCollectionManifest(server, bucket,
+                                                                manifest)) {
+      DEBUG_PRINT("Failed to cache collection manifest for bucket "
+                  << bucket << " on server " << server);
+      return false;
+    }
+    DEBUG_PRINT("Cached collection manifest for bucket "
+                << bucket << " on server " << server);
+    // also update the local cache
+    if (local_collection_manifest_cache != nullptr) {
+      (*local_collection_manifest_cache)[bucket] = manifest;
+    }
+    return true;
+  }
+  // Compare the UID with the cached one
+  // If they are different, refresh the cache
+  else if (manifest.uid != cached_manifest.uid) {
+    DEBUG_PRINT("Collection manifest has changed for bucket "
+                << bucket << " on server " << server);
+    if (!common_metadata_tracking.setBucketToCollectionManifest(server, bucket,
+                                                                manifest)) {
+      DEBUG_PRINT("Failed to update cached collection manifest for bucket "
+                  << bucket << " on server " << server);
+      return false;
+    }
+    DEBUG_PRINT("Updated cached collection manifest for bucket "
+                << bucket << " on server " << server);
+    // update the local cache as well
+    if (local_collection_manifest_cache != nullptr) {
+      (*local_collection_manifest_cache)[bucket] = manifest;
+      DEBUG_PRINT("Added to local collection manifest cache for bucket "
+                  << bucket << " on server " << server);
+    }
+    return true;
+  } else {
+    DEBUG_PRINT("Collection manifest is already up-to-date for bucket "
+                << bucket << " on server " << server);
+    if (local_collection_manifest_cache != nullptr) {
+      if (local_collection_manifest_cache->find(bucket) !=
+          local_collection_manifest_cache->end()) {
+        // if the bucket already exists in the local cache, check the UID
+        if ((*local_collection_manifest_cache)[bucket].uid != manifest.uid) {
+          // if the UID is different, update the local cache
+          (*local_collection_manifest_cache)[bucket] = manifest;
+          DEBUG_PRINT("Updated local collection manifest cache for bucket "
+                      << bucket << " on server " << server);
+        }
+      } else {
+        // if the bucket does not exist in the local cache, add it
+        (*local_collection_manifest_cache)[bucket] = manifest;
+        DEBUG_PRINT("Added to local collection manifest cache for bucket "
+                    << bucket << " on server " << server);
+      }
+    }
+    return false;
+  }
+}
+
+uint32_t CouchbaseOperations::CouchbaseRequest::hashCrc32(const char* key,
+                                                          size_t key_length) {
+  static const uint32_t crc32tab[256] = {
+      0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f,
+      0xe963a535, 0x9e6495a3, 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
+      0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91, 0x1db71064, 0x6ab020f2,
+      0xf3b97148, 0x84be41de, 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
+      0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec, 0x14015c4f, 0x63066cd9,
+      0xfa0f3d63, 0x8d080df5, 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
+      0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b, 0x35b5a8fa, 0x42b2986c,
+      0xdbbbc9d6, 0xacbcf940, 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
+      0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116, 0x21b4f4b5, 0x56b3c423,
+      0xcfba9599, 0xb8bda50f, 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
+      0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d, 0x76dc4190, 0x01db7106,
+      0x98d220bc, 0xefd5102a, 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
+      0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818, 0x7f6a0dbb, 0x086d3d2d,
+      0x91646c97, 0xe6635c01, 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
+      0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457, 0x65b0d9c6, 0x12b7e950,
+      0x8bbeb8ea, 0xfcb9887c, 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
+      0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2, 0x4adfa541, 0x3dd895d7,
+      0xa4d1c46d, 0xd3d6f4fb, 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
+      0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9, 0x5005713c, 0x270241aa,
+      0xbe0b1010, 0xc90c2086, 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
+      0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4, 0x59b33d17, 0x2eb40d81,
+      0xb7bd5c3b, 0xc0ba6cad, 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
+      0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683, 0xe3630b12, 0x94643b84,
+      0x0d6d6a3e, 0x7a6a5aa8, 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
+      0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe, 0xf762575d, 0x806567cb,
+      0x196c3671, 0x6e6b06e7, 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
+      0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5, 0xd6d6a3e8, 0xa1d1937e,
+      0x38d8c2c4, 0x4fdff252, 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
+      0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60, 0xdf60efc3, 0xa867df55,
+      0x316e8eef, 0x4669be79, 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
+      0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f, 0xc5ba3bbe, 0xb2bd0b28,
+      0x2bb45a92, 0x5cb36a04, 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
+      0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a, 0x9c0906a9, 0xeb0e363f,
+      0x72076785, 0x05005713, 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
+      0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21, 0x86d3d2d4, 0xf1d4e242,
+      0x68ddb3f8, 0x1fda836e, 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
+      0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c, 0x8f659eff, 0xf862ae69,
+      0x616bffd3, 0x166ccf45, 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
+      0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db, 0xaed16a4a, 0xd9d65adc,
+      0x40df0b66, 0x37d83bf0, 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
+      0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6, 0xbad03605, 0xcdd70693,
+      0x54de5729, 0x23d967bf, 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
+      0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d,
+  };
+
+  uint64_t x;
+  uint32_t crc = UINT32_MAX;
+
+  for (x = 0; x < key_length; x++)
+    crc = (crc >> 8) ^ crc32tab[(crc ^ (uint64_t)key[x]) & 0xff];
+
+#ifdef __APPLE__
+  return ((~crc) >> 16) % APPLE_VBUCKET_COUNT;
+#else
+  return ((~crc) >> 16) % DEFAULT_VBUCKET_COUNT;
+#endif
+}
+
+void CouchbaseOperations::CouchbaseRequest::sharedCtor() {
+  _pipelined_count = 0;
+  _cached_size_ = 0;
+}
+
+void CouchbaseOperations::CouchbaseRequest::sharedDtor() {}
+
+void CouchbaseOperations::CouchbaseRequest::setCachedSize(int size) const {
+  _cached_size_ = size;
+}
+
+void CouchbaseOperations::CouchbaseRequest::Clear() {
+  _buf.clear();
+  _pipelined_count = 0;
+}
+
+// Support for scope level collections will be added in future.
+// Get the Scope ID for a given scope name
+// bool CouchbaseOperations::CouchbaseRequest::GetScopeId(const
+// butil::StringPiece& scope_name) {
+//   if (scope_name.empty()) {
+//     DEBUG_PRINT("Empty scope name");
+//     return false;
+//   }
+//   // Opcode 0xBC for Get Scope ID (see Collections.md)
+//   const policy::CouchbaseRequestHeader header = {
+//       policy::CB_MAGIC_REQUEST,
+//       policy::CB_GET_SCOPE_ID,
+//       butil::HostToNet16(scope_name.size()),
+//       0,  // no extras
+//       policy::CB_BINARY_RAW_BYTES,
+//       0,  // no vbucket
+//       butil::HostToNet32(scope_name.size()),
+//       0,  // opaque
+//       0   // no CAS
+//   };
+//   if (_buf.append(&header, sizeof(header))) {
+//     return false;
+//   }
+//   if (_buf.append(scope_name.data(), scope_name.size())) {
+//     return false;
+//   }
+//   ++_pipelined_count;
+//   return true;
+// }
+
+bool CouchbaseOperations::CouchbaseRequest::selectBucketRequest(
+    const butil::StringPiece& bucket_name) {
+  if (bucket_name.empty()) {
+    DEBUG_PRINT("Empty bucket name");
+    return false;
+  }
+  // construct the request header
+  const policy::CouchbaseRequestHeader header = {
+      policy::CB_MAGIC_REQUEST,
+      policy::CB_SELECT_BUCKET,
+      butil::HostToNet16(bucket_name.size()),
+      0,
+      policy::CB_BINARY_RAW_BYTES,
+      0,
+      butil::HostToNet32(bucket_name.size()),
+      0,
+      0};
+  if (_buf.append(&header, sizeof(header))) {
+    DEBUG_PRINT("Failed to append header to buffer");
+    return false;
+  }
+  if (_buf.append(bucket_name.data(), bucket_name.size())) {
+    DEBUG_PRINT("Failed to append bucket name to buffer");
+    return false;
+  }
+  ++_pipelined_count;
+  return true;
+}
+
+// HelloRequest sends a Hello request to the Couchbase server, which specifies
+// the client features and capabilities.
+//  This is typically the first request sent after connecting to the server.
+//  It includes the agent name and a randomly generated connection ID in JSON
+//  format.
+bool CouchbaseOperations::CouchbaseRequest::helloRequest() {
+  std::string agent = "brpc/1.0.0 (";
+#ifdef __APPLE__
+  agent += "Darwin/";
+#elif defined(__linux__)
+  agent += "Linux/";
+#else
+  agent += "UnknownOS/";
+#endif
+#if defined(__x86_64__)
+  agent += "x86_64";
+#elif defined(__aarch64__)
+  agent += "arm64";
+#else
+  agent += "unknown";
+#endif
+  agent += ";bssl/0x1010107f)";
+
+  // Generate a random connection ID as hex std::string
+  unsigned char raw_id[CONNECTION_ID_SIZE];
+  FILE* urandom = fopen("/dev/urandom", "rb");
+  if (!urandom ||
+      fread(raw_id, 1, CONNECTION_ID_SIZE, urandom) != CONNECTION_ID_SIZE) {
+    if (urandom) fclose(urandom);
+    DEBUG_PRINT("Failed to generate random connection id");
+    return false;
+  }
+  fclose(urandom);
+  char hex_id[RANDOM_ID_HEX_SIZE] = {0};
+  for (int i = 0; i < CONNECTION_ID_SIZE; ++i) {
+    sprintf(hex_id + i * 2, "%02x", raw_id[i]);
+  }
+
+  // Format key as JSON: {"a":"agent","i":"hex_id"}
+  std::string key =
+      std::string("{\"a\":\"") + agent + "\",\"i\":\"" + hex_id + "\"}";
+
+  const uint16_t key_len = key.size();
+  uint16_t features[] = {
+      butil::HostToNet16(0x0001),  // Datatype
+      butil::HostToNet16(0x0006),  // XError
+      butil::HostToNet16(0x0007),  // SelectBucket
+      butil::HostToNet16(0x000b),  // Snappy
+      butil::HostToNet16(0x0012)   // Collections
+  };
+
+  const uint32_t value_len = sizeof(features);
+  const uint32_t total_body_len = key_len + value_len;
+
+  const policy::CouchbaseRequestHeader header = {
+      policy::CB_MAGIC_REQUEST,
+      policy::CB_HELLO_SELECT_FEATURES,
+      butil::HostToNet16(key_len),         // key length
+      0,                                   // extras length
+      policy::CB_BINARY_RAW_BYTES,         // data type
+      0,                                   // vbucket id
+      butil::HostToNet32(total_body_len),  // total body length
+      0,                                   // opaque
+      0                                    // cas value
+  };
+
+  if (_buf.append(&header, sizeof(header))) {
+    DEBUG_PRINT("Failed to append Hello header to buffer");
+    return false;
+  }
+  if (_buf.append(key.data(), key_len)) {
+    DEBUG_PRINT("Failed to append Hello JSON key to buffer");
+    return false;
+  }
+  if (_buf.append(reinterpret_cast<const char*>(features), value_len)) {
+    DEBUG_PRINT("Failed to append Hello features to buffer");
+    return false;
+  }
+  ++_pipelined_count;
+  return true;
+}
+
+bool CouchbaseOperations::CouchbaseRequest::authenticateRequest(
+    const butil::StringPiece& username, const butil::StringPiece& password) {
+  if (username.empty() || password.empty()) {
+    DEBUG_PRINT("Empty username or password");
+    return false;
+  }
+  // insert the features to get enabled, calling function helloRequest() will do
+  // this.
+  if (!helloRequest()) {
+    DEBUG_PRINT("Failed to send helloRequest for authentication");
+    return false;
+  }
+  // Construct the request header
+  constexpr char kPlainAuthCommand[] = "PLAIN";
+  constexpr char kPadding[1] = {'\0'};
+  const brpc::policy::CouchbaseRequestHeader header = {
+      brpc::policy::CB_MAGIC_REQUEST,
+      brpc::policy::CB_BINARY_SASL_AUTH,
+      butil::HostToNet16(sizeof(kPlainAuthCommand) - 1),
+      0,
+      0,
+      0,
+      butil::HostToNet32(sizeof(kPlainAuthCommand) + 1 + username.length() * 2 +
+                         password.length()),
+      0,
+      0};
+  std::string auth_str;
+  auth_str.reserve(sizeof(header) + sizeof(kPlainAuthCommand) - 1 +
+                   username.size() * 2 + password.size() + 2);
+  auth_str.append(reinterpret_cast<const char*>(&header), sizeof(header));
+  auth_str.append(kPlainAuthCommand, sizeof(kPlainAuthCommand) - 1);
+  auth_str.append(username.data(), username.size());
+  auth_str.append(kPadding, sizeof(kPadding));
+  auth_str.append(username.data(), username.size());
+  auth_str.append(kPadding, sizeof(kPadding));
+  auth_str.append(password.data(), password.size());
+  if (_buf.append(auth_str.data(), auth_str.size())) {
+    DEBUG_PRINT("Failed to append auth std::string to buffer");
+    return false;
+  }
+  ++_pipelined_count;
+  return true;
+}
+
+void CouchbaseOperations::CouchbaseRequest::MergeFrom(
+    const CouchbaseRequest& from) {
+  CHECK_NE(&from, this);
+  _buf.append(from._buf);
+  _pipelined_count += from._pipelined_count;
+}
+
+bool CouchbaseOperations::CouchbaseRequest::IsInitialized() const {
+  return _pipelined_count != 0;
+}
+
+void CouchbaseOperations::CouchbaseRequest::Swap(CouchbaseRequest* other) {
+  if (other != this) {
+    _buf.swap(other->_buf);
+    std::swap(_pipelined_count, other->_pipelined_count);
+    std::swap(_cached_size_, other->_cached_size_);
+  }
+}
+
+void CouchbaseOperations::CouchbaseResponse::sharedCtor() { _cached_size_ = 0; }
+
+void CouchbaseOperations::CouchbaseResponse::sharedDtor() {}
+
+void CouchbaseOperations::CouchbaseResponse::setCachedSize(int size) const {
+  _cached_size_ = size;
+}
+
+void CouchbaseOperations::CouchbaseResponse::Clear() {}
+
+void CouchbaseOperations::CouchbaseResponse::MergeFrom(
+    const CouchbaseResponse& from) {
+  CHECK_NE(&from, this);
+  _err = from._err;
+  _buf.append(from._buf);
+}
+
+bool CouchbaseOperations::CouchbaseResponse::IsInitialized() const {
+  return !_buf.empty();
+}
+
+void CouchbaseOperations::CouchbaseResponse::swap(CouchbaseResponse* other) {
+  if (other != this) {
+    _buf.swap(other->_buf);
+    std::swap(_cached_size_, other->_cached_size_);
+  }
+}
+
+// ===================================================================
+
+const char* CouchbaseOperations::CouchbaseResponse::statusStr(Status st) {
+  switch (st) {
+    case STATUS_SUCCESS:
+      return "SUCCESS";
+    case STATUS_KEY_ENOENT:
+      return "Key not found";
+    case STATUS_KEY_EEXISTS:
+      return "Key already exists";
+    case STATUS_E2BIG:
+      return "Value too large";
+    case STATUS_EINVAL:
+      return "Invalid arguments";
+    case STATUS_NOT_STORED:
+      return "Item not stored";
+    case STATUS_DELTA_BADVAL:
+      return "Invalid delta value for increment/decrement";
+    case STATUS_VBUCKET_BELONGS_TO_ANOTHER_SERVER:
+      return "VBucket belongs to another server";
+    case STATUS_AUTH_ERROR:
+      return "Authentication failed";
+    case STATUS_AUTH_CONTINUE:
+      return "Authentication continue";
+    case STATUS_ERANGE:
+      return "Range error";
+    case STATUS_ROLLBACK:
+      return "Rollback required";
+    case STATUS_EACCESS:
+      return "Access denied";
+    case STATUS_NOT_INITIALIZED:
+      return "Not initialized";
+    case STATUS_UNKNOWN_COMMAND:
+      return "Unknown command";
+    case STATUS_ENOMEM:
+      return "Out of memory";
+    case STATUS_NOT_SUPPORTED:
+      return "Operation not supported";
+    case STATUS_EINTERNAL:
+      return "Internal server error";
+    case STATUS_EBUSY:
+      return "Server busy";
+    case STATUS_ETMPFAIL:
+      return "Temporary failure";
+    case STATUS_UNKNOWN_COLLECTION:
+      return "Unknown collection";
+    case STATUS_NO_COLLECTIONS_MANIFEST:
+      return "No collections manifest";
+    case STATUS_CANNOT_APPLY_COLLECTIONS_MANIFEST:
+      return "Cannot apply collections manifest";
+    case STATUS_COLLECTIONS_MANIFEST_IS_AHEAD:
+      return "Collections manifest is ahead";
+    case STATUS_UNKNOWN_SCOPE:
+      return "Unknown scope";
+    case STATUS_DCP_STREAM_ID_INVALID:
+      return "Invalid DCP stream ID";
+    case STATUS_DURABILITY_INVALID_LEVEL:
+      return "Invalid durability level";
+    case STATUS_DURABILITY_IMPOSSIBLE:
+      return "Durability requirements impossible";
+    case STATUS_SYNC_WRITE_IN_PROGRESS:
+      return "Synchronous write in progress";
+    case STATUS_SYNC_WRITE_AMBIGUOUS:
+      return "Synchronous write result ambiguous";
+    case STATUS_SYNC_WRITE_RE_COMMIT_IN_PROGRESS:
+      return "Synchronous write re-commit in progress";
+    case STATUS_SUBDOC_PATH_NOT_FOUND:
+      return "Sub-document path not found";
+    case STATUS_SUBDOC_PATH_MISMATCH:
+      return "Sub-document path mismatch";
+    case STATUS_SUBDOC_PATH_EINVAL:
+      return "Invalid sub-document path";
+    case STATUS_SUBDOC_PATH_E2BIG:
+      return "Sub-document path too deep";
+    case STATUS_SUBDOC_DOC_E2DEEP:
+      return "Sub-document too deep";
+    case STATUS_SUBDOC_VALUE_CANTINSERT:
+      return "Cannot insert sub-document value";
+    case STATUS_SUBDOC_DOC_NOT_JSON:
+      return "Document is not JSON";
+    case STATUS_SUBDOC_NUM_E2BIG:
+      return "Sub-document number too large";
+    case STATUS_SUBDOC_DELTA_E2BIG:
+      return "Sub-document delta too large";
+    case STATUS_SUBDOC_PATH_EEXISTS:
+      return "Sub-document path already exists";
+    case STATUS_SUBDOC_VALUE_E2DEEP:
+      return "Sub-document value too deep";
+    case STATUS_SUBDOC_INVALID_COMBO:
+      return "Invalid sub-document operation combination";
+    case STATUS_SUBDOC_MULTI_PATH_FAILURE:
+      return "Sub-document multi-path operation failed";
+    case STATUS_SUBDOC_SUCCESS_DELETED:
+      return "Sub-document operation succeeded on deleted document";
+    case STATUS_SUBDOC_XATTR_INVALID_FLAG_COMBO:
+      return "Invalid extended attribute flag combination";
+    case STATUS_SUBDOC_XATTR_INVALID_KEY_COMBO:
+      return "Invalid extended attribute key combination";
+    case STATUS_SUBDOC_XATTR_UNKNOWN_MACRO:
+      return "Unknown extended attribute macro";
+    case STATUS_SUBDOC_XATTR_UNKNOWN_VATTR:
+      return "Unknown virtual extended attribute";
+    case STATUS_SUBDOC_XATTR_CANT_MODIFY_VATTR:
+      return "Cannot modify virtual extended attribute";
+    case STATUS_SUBDOC_MULTI_PATH_FAILURE_DELETED:
+      return "Sub-document multi-path operation failed on deleted document";
+    case STATUS_SUBDOC_INVALID_XATTR_ORDER:
+      return "Invalid extended attribute order";
+    case STATUS_SUBDOC_XATTR_UNKNOWN_VATTR_MACRO:
+      return "Unknown virtual extended attribute macro";
+    case STATUS_SUBDOC_CAN_ONLY_REVIVE_DELETED_DOCUMENTS:
+      return "Can only revive deleted documents";
+    case STATUS_SUBDOC_DELETED_DOCUMENT_CANT_HAVE_VALUE:
+      return "Deleted document cannot have a value";
+    case STATUS_XATTR_EINVAL:
+      return "Invalid extended attributes";
+  }
+  return "Unknown status";
+}
+
+// Helper method to format error messages with status codes
+std::string CouchbaseOperations::CouchbaseResponse::formatErrorMessage(
+    uint16_t status_code, const std::string& operation,
+    const std::string& error_msg) {
+  if (error_msg.empty()) {
+    return butil::string_printf("%s failed with status 0x%02x (%s)",
+                                operation.c_str(), status_code,
+                                statusStr((Status)status_code));
+  } else {
+    return butil::string_printf(
+        "%s failed with status 0x%02x (%s): %s", operation.c_str(), status_code,
+        statusStr((Status)status_code), error_msg.c_str());
+  }
+}
+
+// MUST NOT have extras.
+// MUST have key.
+// MUST NOT have value.
+bool CouchbaseOperations::CouchbaseRequest::getOrDelete(
+    uint8_t command, const butil::StringPiece& key, uint8_t coll_id) {
+  // Collection ID
+  uint8_t collection_id = coll_id;
+  uint16_t VBucket_id = hashCrc32(key.data(), key.size());
+  const policy::CouchbaseRequestHeader header = {
+      policy::CB_MAGIC_REQUEST, command,
+      butil::HostToNet16(key.size() + 1),  // Key
+      0,                                   // extras length
+      policy::CB_BINARY_RAW_BYTES,         // data type
+      butil::HostToNet16(VBucket_id),
+      butil::HostToNet32(key.size() +
+                         sizeof(collection_id)),  // total body length includes
+                                                  // key and collection id
+      0, 0};
+  if (_buf.append(&header, sizeof(header))) {
+    return false;
+  }
+  if (_buf.append(&collection_id, sizeof(collection_id))) {
+    return false;
+  }
+  if (_buf.append(key.data(), key.size())) {
+    return false;
+  }
+  ++_pipelined_count;
+  return true;
+}
+
+// collectionID fetching either from the metadata cache or if doesn't exist then
+// fetch from the server.
+bool CouchbaseOperations::CouchbaseRequest::getCachedOrFetchCollectionId(
+    std::string collection_name, uint8_t* coll_id,
+    brpc::CouchbaseManifestManager* metadata_tracking, brpc::Channel* channel,
+    const std::string& server, const std::string& selected_bucket,
+    std:: unordered_map<std::string, CouchbaseManifestManager::CollectionManifest>*
+        local_cache) {
+  if (collection_name.empty()) {
+    DEBUG_PRINT("Empty collection name");
+    return false;
+  }
+  if (channel == nullptr) {
+    DEBUG_PRINT("No channel found, make sure to call Authenticate() first");
+    return false;
+  }
+  if (server.empty()) {
+    DEBUG_PRINT("Server is empty, make sure to call Authenticate() first");
+    return false;
+  }
+  if (selected_bucket.empty()) {
+    DEBUG_PRINT("No bucket selected, make sure to call SelectBucket() first");
+    return false;
+  }
+
+  brpc::CouchbaseManifestManager::CollectionManifest manifest;
+  // check if the server/bucket exists in the cached collection manifest
+  if (!metadata_tracking->getBucketToCollectionManifest(server, selected_bucket,
+                                                        &manifest)) {
+    DEBUG_PRINT("No cached collection manifest found for bucket "
+                << selected_bucket << " on server " << server
+                << ", fetching from server");
+    // No cached manifest found, fetch from server
+    if (!metadata_tracking->refreshCollectionManifest(
+            channel, server, selected_bucket, local_cache)) {
+      return false;
+    }
+    // local cache will also be updated in refreshCollectionManifest
+    // get the reference to collectionID from local cache
+    if (!getLocalCachedCollectionId(selected_bucket, "_default",
+                                    collection_name, coll_id)) {
+      // collectionID not found in the latest manifest fetched from server
+      return false;
+    }
+    // collectionID has been found in the latest manifest fetched from server
+    // and is stored in coll_id
+    return true;
+  } else {
+    // check if collection name to id mapping exists.
+    if (!metadata_tracking->getManifestToCollectionId(
+            &manifest, "_default", collection_name, coll_id)) {
+      // Just to verify that the collectionID does not exist in the manifest
+      // refresh manifest from server and try again
+      if (!metadata_tracking->refreshCollectionManifest(
+              channel, server, selected_bucket, local_cache)) {
+        return false;
+      }
+      // local cache will also be updated in refreshCollectionManifest
+      // get the reference to collectionID from local cache
+      if (!getLocalCachedCollectionId(selected_bucket, "_default",
+                                      collection_name, coll_id)) {
+        // collectionID not found in the latest manifest fetched from server
+        return false;
+      }
+      // collectionID has been found in the latest manifest fetched from server
+      // and is stored in coll_id
+      return true;
+    }
+    // update the local cache with the manifest in global cache
+    (*local_collection_manifest_cache)[selected_bucket] = manifest;
+    // collectionID found in the cached manifest
+    return true;
+  }
+}
+
+bool CouchbaseOperations::CouchbaseRequest::getRequest(
+    const butil::StringPiece& key, std::string collection_name,
+    brpc::Channel* channel, const std::string& server, const std::string& bucket) {
+  DEBUG_PRINT("getRequest called with key: "
+              << key << ", collection_name: " << collection_name
+              << ", server: " << server << ", bucket: " << bucket);
+  uint8_t coll_id = 0;  // default collection ID
+  if (collection_name != "_default") {
+    // check if the local cache is empty or not.
+    if (local_collection_manifest_cache->empty()) {
+      DEBUG_PRINT("Local collection manifest cache is empty in getRequest");
+      // if local cache is empty, goto global cache or fetch from server
+      if (!getCachedOrFetchCollectionId(
+              collection_name, &coll_id, metadata_tracking, channel, server,
+              bucket, local_collection_manifest_cache)) {
+        DEBUG_PRINT(
+            "Failed to get collection id from global cache or server in "
+            "getRequest");
+        return false;
+      }
+    }
+    // check if the collection id is available in the local cache
+    else if (!getLocalCachedCollectionId(bucket, "_default", collection_name,
+                                         &coll_id)) {
+      DEBUG_PRINT("Collection id not found in local cache in getRequest");
+      // if not check in the global cache or fetch from server
+      if (!getCachedOrFetchCollectionId(
+              collection_name, &coll_id, metadata_tracking, channel, server,
+              bucket, local_collection_manifest_cache)) {
+        DEBUG_PRINT(
+            "Failed to get collection id from global cache or server in "
+            "getRequest");
+        return false;
+      }
+    }
+  }
+  DEBUG_PRINT("getRequest using coll_id: " << (int)coll_id);
+  return getOrDelete(policy::CB_BINARY_GET, key, coll_id);
+}
+
+bool CouchbaseOperations::CouchbaseRequest::deleteRequest(
+    const butil::StringPiece& key, std::string collection_name,
+    brpc::Channel* channel, const std::string& server, const std::string& bucket) {
+  DEBUG_PRINT("deleteRequest called with key: "
+              << key << ", collection_name: " << collection_name
+              << ", server: " << server << ", bucket: " << bucket);
+  uint8_t coll_id = 0;  // default collection ID
+  if (collection_name != "_default") {
+    // check if the local cache is empty or not.
+    if (local_collection_manifest_cache->empty()) {
+      DEBUG_PRINT("Local collection manifest cache is empty in deleteRequest");
+      // if local cache is empty, goto global cache or fetch from server
+      if (!getCachedOrFetchCollectionId(
+              collection_name, &coll_id, metadata_tracking, channel, server,
+              bucket, local_collection_manifest_cache)) {
+        DEBUG_PRINT(
+            "Failed to get collection id from global cache or server in "
+            "deleteRequest");
+        return false;
+      }
+    }
+    // check if the collection id is available in the local cache
+    else if (!getLocalCachedCollectionId(bucket, "_default", collection_name,
+                                         &coll_id)) {
+      DEBUG_PRINT("Collection id not found in local cache in deleteRequest");
+      // if not check in the global cache or fetch from server
+      if (!getCachedOrFetchCollectionId(
+              collection_name, &coll_id, metadata_tracking, channel, server,
+              bucket, local_collection_manifest_cache)) {
+        DEBUG_PRINT(
+            "Failed to get collection id from global cache or server in "
+            "deleteRequest");
+        return false;
+      }
+    }
+  }
+  DEBUG_PRINT("deleteRequest using coll_id: " << (int)coll_id);
+  return getOrDelete(policy::CB_BINARY_DELETE, key, coll_id);
+}
+
+struct FlushHeaderWithExtras {
+  policy::CouchbaseRequestHeader header;
+  uint32_t exptime;
+} __attribute__((packed));
+BAIDU_CASSERT(sizeof(FlushHeaderWithExtras) == 28, must_match);
+
+bool CouchbaseOperations::CouchbaseResponse::popGet(butil::IOBuf* value,
+                                                    uint32_t* flags,
+                                                    uint64_t* cas_value) {
+  const size_t n = _buf.size();
+  policy::CouchbaseResponseHeader header;
+  if (n < sizeof(header)) {
+    butil::string_printf(&_err, "buffer is too small to contain a header");
+    return false;
+  }
+  _buf.copy_to(&header, sizeof(header));
+  if (header.command != (uint8_t)policy::CB_BINARY_GET) {
+    butil::string_printf(&_err, "not a GET response");
+    return false;
+  }
+  if (n < sizeof(header) + header.total_body_length) {
+    butil::string_printf(&_err, "response=%u < header=%u + body=%u",
+                         (unsigned)n, (unsigned)sizeof(header),
+                         header.total_body_length);
+    return false;
+  }
+  if (header.status != (uint16_t)STATUS_SUCCESS) {
+    if (DBUG && header.extras_length != 0) {
+      DEBUG_PRINT("GET response must not have flags");
+    }
+    if (DBUG && header.key_length != 0) {
+      DEBUG_PRINT("GET response must not have key");
+    }
+    const int value_size = (int)header.total_body_length -
+                           (int)header.extras_length - (int)header.key_length;
+    _status_code = header.status;
+    if (value_size < 0) {
+      butil::string_printf(&_err, "value_size=%d is non-negative", value_size);
+      return false;
+    }
+    _buf.pop_front(sizeof(header) + header.extras_length + header.key_length);
+    if (value_size > 0) {
+      std::string error_msg;
+      _buf.cutn(&error_msg, value_size);
+      _err = formatErrorMessage(header.status, "GET operation", error_msg);
+    } else {
+      _err = formatErrorMessage(header.status, "GET operation");
+    }
+    return false;
+  }
+  if (header.extras_length != 4u) {
+    butil::string_printf(
+        &_err, "GET response must have flags as extras, actual length=%u",
+        header.extras_length);
+    return false;
+  }
+  if (header.key_length != 0) {
+    butil::string_printf(&_err, "GET response must not have key");
+    return false;
+  }
+  const int value_size = (int)header.total_body_length -
+                         (int)header.extras_length - (int)header.key_length;
+  if (value_size < 0) {
+    butil::string_printf(&_err, "value_size=%d is non-negative", value_size);
+    return false;
+  }
+  _buf.pop_front(sizeof(header));
+  uint32_t raw_flags = 0;
+  _buf.cutn(&raw_flags, sizeof(raw_flags));
+  if (flags) {
+    *flags = butil::NetToHost32(raw_flags);
+  }
+  if (value) {
+    value->clear();
+    _buf.cutn(value, value_size);
+  }
+  if (cas_value) {
+    *cas_value = header.cas_value;
+  }
+  _err.clear();
+  return true;
+}
+
+bool CouchbaseOperations::CouchbaseResponse::popGet(std::string* value,
+                                                    uint32_t* flags,
+                                                    uint64_t* cas_value) {
+  butil::IOBuf tmp;
+  if (popGet(&tmp, flags, cas_value)) {
+    tmp.copy_to(value);
+    return true;
+  }
+  return false;
+}
+
+// MUST NOT have extras
+// MUST NOT have key
+// MUST NOT have value
+bool CouchbaseOperations::CouchbaseResponse::popDelete() {
+  return popStore(policy::CB_BINARY_DELETE, NULL);
+}
+
+struct StoreHeaderWithExtras {
+  policy::CouchbaseRequestHeader header;
+  uint32_t flags;
+  uint32_t exptime;
+} __attribute__((packed));
+BAIDU_CASSERT(sizeof(StoreHeaderWithExtras) == 32, must_match);
+const size_t STORE_EXTRAS =
+    sizeof(StoreHeaderWithExtras) - sizeof(policy::CouchbaseRequestHeader);
+// MUST have extras.
+// MUST have key.
+// MAY have value.
+// Extra data for set/add/replace:
+// Byte/     0       |       1       |       2       |       3       |
+//    /              |               |               |               |
+//   |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+//   +---------------+---------------+---------------+---------------+
+//  0| Flags                                                         |
+//   +---------------+---------------+---------------+---------------+
+//  4| Expiration                                                    |
+//   +---------------+---------------+---------------+---------------+
+//   Total 8 bytes
+bool CouchbaseOperations::CouchbaseRequest::store(
+    uint8_t command, const butil::StringPiece& key,
+    const butil::StringPiece& value, uint32_t flags, uint32_t exptime,
+    uint64_t cas_value, uint8_t coll_id) {
+  // add collection id
+  //  uint16_t collection_id = 0x00;
+  uint8_t collection_id = coll_id;
+  uint16_t vBucket_id = hashCrc32(key.data(), key.size());
+  StoreHeaderWithExtras header_with_extras = {
+      {policy::CB_MAGIC_REQUEST, command,
+       butil::HostToNet16(key.size() +
+                          1),  // collection id is not included in part of key,
+                               // so not including it in key length.
+       STORE_EXTRAS, policy::CB_JSON, butil::HostToNet16(vBucket_id),
+       butil::HostToNet32(STORE_EXTRAS + sizeof(collection_id) + key.size() +
+                          value.size()),  // total body length
+       0, butil::HostToNet64(cas_value)},
+      butil::HostToNet32(flags),
+      butil::HostToNet32(exptime)};
+  if (_buf.append(&header_with_extras, sizeof(header_with_extras))) {
+    return false;
+  }
+  if (_buf.append(&collection_id, sizeof(collection_id))) {
+    return false;
+  }
+  if (_buf.append(key.data(), key.size())) {
+    return false;
+  }
+  if (_buf.append(value.data(), value.size())) {
+    return false;
+  }
+  ++_pipelined_count;
+  return true;
+}
+
+// MUST have CAS
+// MUST NOT have extras
+// MUST NOT have key
+// MUST NOT have value
+bool CouchbaseOperations::CouchbaseResponse::popStore(uint8_t command,
+                                                      uint64_t* cas_value) {
+  const size_t n = _buf.size();
+  policy::CouchbaseResponseHeader header;
+  if (n < sizeof(header)) {
+    butil::string_printf(&_err, "buffer is too small to contain a header");
+    return false;
+  }
+  _buf.copy_to(&header, sizeof(header));
+  if (header.command != command) {
+    butil::string_printf(&_err, "Not a STORE response");
+    return false;
+  }
+  if (n < sizeof(header) + header.total_body_length) {
+    butil::string_printf(&_err, "Not enough data");
+    return false;
+  }
+  if (DBUG && header.extras_length != 0) {
+    DEBUG_PRINT("STORE response must not have flags");
+  }
+  if (DBUG && header.key_length != 0) {
+    DEBUG_PRINT("STORE response must not have key");
+  }
+  int value_size = (int)header.total_body_length - (int)header.extras_length -
+                   (int)header.key_length;
+  if (header.status != (uint16_t)STATUS_SUCCESS) {
+    _buf.pop_front(sizeof(header) + header.extras_length + header.key_length);
+    _status_code = header.status;
+    if (value_size > 0) {
+      std::string error_msg;
+      _buf.cutn(&error_msg, value_size);
+      _err = formatErrorMessage(
+          header.status, couchbaseBinaryCommandToString(command), error_msg);
+    } else {
+      _err = formatErrorMessage(header.status,
+                                couchbaseBinaryCommandToString(command));
+    }
+    return false;
+  }
+  if (DBUG && value_size != 0) {
+    DEBUG_PRINT("STORE response must not have value, actually=" << value_size);
+  }
+  _buf.pop_front(sizeof(header) + header.total_body_length);
+  if (cas_value) {
+    *cas_value = header.cas_value;
+  }
+  _err.clear();
+  return true;
+}
+
+const char*
+CouchbaseOperations::CouchbaseResponse::couchbaseBinaryCommandToString(
+    uint8_t cmd) {
+  switch (cmd) {
+    case 0x1f:
+      return "CB_HELLO_SELECT_FEATURES";
+    case 0x89:
+      return "CB_SELECT_BUCKET";
+    case 0xBC:
+      return "CB_GET_SCOPE_ID";
+    case 0x00:
+      return "CB_BINARY_GET";
+    case 0x01:
+      return "CB_BINARY_SET";
+    case 0x02:
+      return "CB_BINARY_ADD";
+    case 0x03:
+      return "CB_BINARY_REPLACE";
+    case 0x04:
+      return "CB_BINARY_DELETE";
+    case 0x05:
+      return "CB_BINARY_INCREMENT";
+    case 0x06:
+      return "CB_BINARY_DECREMENT";
+    case 0x07:
+      return "CB_BINARY_QUIT";
+    case 0x08:
+      return "CB_BINARY_FLUSH";
+    case 0x09:
+      return "CB_BINARY_GETQ";
+    case 0x0a:
+      return "CB_BINARY_NOOP";
+    case 0x0b:
+      return "CB_BINARY_VERSION";
+    case 0x0c:
+      return "CB_BINARY_GETK";
+    case 0x0d:
+      return "CB_BINARY_GETKQ";
+    case 0x0e:
+      return "CB_BINARY_APPEND";
+    case 0x0f:
+      return "CB_BINARY_PREPEND";
+    case 0x10:
+      return "CB_BINARY_STAT";
+    case 0x11:
+      return "CB_BINARY_SETQ";
+    case 0x12:
+      return "CB_BINARY_ADDQ";
+    case 0x13:
+      return "CB_BINARY_REPLACEQ";
+    case 0x14:
+      return "CB_BINARY_DELETEQ";
+    case 0x15:
+      return "CB_BINARY_INCREMENTQ";
+    case 0x16:
+      return "CB_BINARY_DECREMENTQ";
+    case 0x17:
+      return "CB_BINARY_QUITQ";
+    case 0x18:
+      return "CB_BINARY_FLUSHQ";
+    case 0x19:
+      return "CB_BINARY_APPENDQ";
+    case 0x1a:
+      return "CB_BINARY_PREPENDQ";
+    case 0x1c:
+      return "CB_BINARY_TOUCH";
+    case 0x1d:
+      return "CB_BINARY_GAT";
+    case 0x1e:
+      return "CB_BINARY_GATQ";
+    case 0x23:
+      return "CB_BINARY_GATK";
+    case 0x24:
+      return "CB_BINARY_GATKQ";
+    case 0x20:
+      return "CB_BINARY_SASL_LIST_MECHS";
+    case 0x21:
+      return "CB_BINARY_SASL_AUTH";
+    case 0x22:
+      return "CB_BINARY_SASL_STEP";
+    case 0xb5:
+      return "CB_GET_CLUSTER_CONFIG";
+    case 0xba:
+      return "CB_GET_COLLECTIONS_MANIFEST";
+    case 0xbb:
+      return "CB_COLLECTIONS_GET_CID";
+    default:
+      return "UNKNOWN_COMMAND";
+  }
+}
+
+bool CouchbaseOperations::CouchbaseRequest::upsertRequest(
+    const butil::StringPiece& key, const butil::StringPiece& value,
+    uint32_t flags, uint32_t exptime, uint64_t cas_value,
+    std::string collection_name, brpc::Channel* channel, const std::string& server,
+    const std::string& bucket) {
+  DEBUG_PRINT("upsertRequest called with key: "
+              << key << ", value: " << value
+              << ", collection_name: " << collection_name
+              << ", server: " << server << ", bucket: " << bucket);
+  uint8_t coll_id = 0;  // default collection ID
+  if (collection_name != "_default") {
+    // check if the local cache is empty or not.
+    if (local_collection_manifest_cache->empty()) {
+      DEBUG_PRINT("Local collection manifest cache is empty in upsertRequest");
+      // if local cache is empty, goto global cache or fetch from server
+      if (!getCachedOrFetchCollectionId(
+              collection_name, &coll_id, metadata_tracking, channel, server,
+              bucket, local_collection_manifest_cache)) {
+        DEBUG_PRINT(
+            "Failed to get collection id from global cache or server in "
+            "upsertRequest");
+        return false;
+      }
+    }
+    // check if the collection id is available in the local cache
+    else if (!getLocalCachedCollectionId(bucket, "_default", collection_name,
+                                         &coll_id)) {
+      DEBUG_PRINT("Collection id not found in local cache in upsertRequest");
+      // if not check in the global cache or fetch from server
+      if (!getCachedOrFetchCollectionId(
+              collection_name, &coll_id, metadata_tracking, channel, server,
+              bucket, local_collection_manifest_cache)) {
+        DEBUG_PRINT(
+            "Failed to get collection id from global cache or server in "
+            "upsertRequest");
+        return false;
+      }
+    }
+  }
+  DEBUG_PRINT("upsertRequest using coll_id: " << (int)coll_id);
+  return store(policy::CB_BINARY_SET, key, value, flags, exptime, cas_value,
+               coll_id);
+}
+
+bool CouchbaseOperations::CouchbaseRequest::getCollectionManifest() {
+  const policy::CouchbaseRequestHeader header = {
+      policy::CB_MAGIC_REQUEST,
+      policy::CB_GET_COLLECTIONS_MANIFEST,
+      0,  // no key
+      0,  // no extras
+      policy::CB_BINARY_RAW_BYTES,
+      0,  // no vbucket
+      0,  // no body (no key, no extras, no value)
+      0,  // opaque
+      0   // no CAS
+  };
+  if (_buf.append(&header, sizeof(header))) {
+    return false;
+  }
+  ++_pipelined_count;
+  return true;
+}
+
+bool CouchbaseOperations::CouchbaseRequest::addRequest(
+    const butil::StringPiece& key, const butil::StringPiece& value,
+    uint32_t flags, uint32_t exptime, uint64_t cas_value,
+    std::string collection_name, brpc::Channel* channel, const std::string& server,
+    const std::string& bucket) {
+  DEBUG_PRINT("addRequest called with key: "
+              << key << ", value: " << value
+              << ", collection_name: " << collection_name
+              << ", server: " << server << ", bucket: " << bucket);
+  uint8_t coll_id = 0;  // default collection ID
+  if (collection_name != "_default") {
+    // check if the local cache is empty or not.
+    if (local_collection_manifest_cache->empty()) {
+      DEBUG_PRINT("Local collection manifest cache is empty in addRequest");
+      // if local cache is empty, goto global cache or fetch from server
+      if (!getCachedOrFetchCollectionId(
+              collection_name, &coll_id, metadata_tracking, channel, server,
+              bucket, local_collection_manifest_cache)) {
+        DEBUG_PRINT(
+            "Failed to get collection id from global cache or server in "
+            "addRequest");
+        return false;
+      }
+    }
+    // check if the collection id is available in the local cache
+    else if (!getLocalCachedCollectionId(bucket, "_default", collection_name,
+                                         &coll_id)) {
+      DEBUG_PRINT("Collection id not found in local cache in addRequest");
+      // if not check in the global cache or fetch from server
+      if (!getCachedOrFetchCollectionId(
+              collection_name, &coll_id, metadata_tracking, channel, server,
+              bucket, local_collection_manifest_cache)) {
+        DEBUG_PRINT(
+            "Failed to get collection id from global cache or server in "
+            "addRequest");
+        return false;
+      }
+    }
+  }
+  DEBUG_PRINT("addRequest using coll_id: " << (int)coll_id);
+  return store(policy::CB_BINARY_ADD, key, value, flags, exptime, cas_value,
+               coll_id);
+}
+
+bool CouchbaseOperations::CouchbaseRequest::appendRequest(
+    const butil::StringPiece& key, const butil::StringPiece& value,
+    uint32_t flags, uint32_t exptime, uint64_t cas_value,
+    std::string collection_name, brpc::Channel* channel, const std::string& server,
+    const std::string& bucket) {
+  if (value.empty()) {
+    DEBUG_PRINT("value to append must be non-empty");
+    return false;
+  }
+  uint8_t coll_id = 0;  // default collection ID
+  if (collection_name != "_default") {
+    // check if the local cache is empty or not.
+    if (local_collection_manifest_cache->empty()) {
+      // if local cache is empty, goto global cache or fetch from server
+      if (!getCachedOrFetchCollectionId(
+              collection_name, &coll_id, metadata_tracking, channel, server,
+              bucket, local_collection_manifest_cache)) {
+        return false;
+      }
+    }
+    // check if the collection id is available in the local cache
+    else if (!getLocalCachedCollectionId(bucket, "_default", collection_name,
+                                         &coll_id)) {
+      // if not check in the global cache or fetch from server
+      if (!getCachedOrFetchCollectionId(
+              collection_name, &coll_id, metadata_tracking, channel, server,
+              bucket, local_collection_manifest_cache)) {
+        return false;
+      }
+    }
+  }
+  return store(policy::CB_BINARY_APPEND, key, value, flags, exptime, cas_value,
+               coll_id);
+}
+
+bool CouchbaseOperations::CouchbaseRequest::prependRequest(
+    const butil::StringPiece& key, const butil::StringPiece& value,
+    uint32_t flags, uint32_t exptime, uint64_t cas_value,
+    std::string collection_name, brpc::Channel* channel, const std::string& server,
+    const std::string& bucket) {
+  if (value.empty()) {
+    DEBUG_PRINT("value to prepend must be non-empty");
+    return false;
+  }
+  uint8_t coll_id = 0;  // default collection ID
+  if (collection_name != "_default") {
+    // check if the local cache is empty or not.
+    if (local_collection_manifest_cache->empty()) {
+      // if local cache is empty, goto global cache or fetch from server
+      if (!getCachedOrFetchCollectionId(
+              collection_name, &coll_id, metadata_tracking, channel, server,
+              bucket, local_collection_manifest_cache)) {
+        return false;
+      }
+    }
+    // check if the collection id is available in the local cache
+    else if (!getLocalCachedCollectionId(bucket, "_default", collection_name,
+                                         &coll_id)) {
+      // if not check in the global cache or fetch from server
+      if (!getCachedOrFetchCollectionId(
+              collection_name, &coll_id, metadata_tracking, channel, server,
+              bucket, local_collection_manifest_cache)) {
+        return false;
+      }
+    }
+  }
+  return store(policy::CB_BINARY_PREPEND, key, value, flags, exptime, cas_value,
+               coll_id);
+}
+
+bool CouchbaseOperations::CouchbaseResponse::popAuthenticate(
+    uint64_t* cas_value) {
+  return popStore(policy::CB_BINARY_SASL_AUTH, cas_value);
+}
+bool CouchbaseOperations::CouchbaseResponse::popHello(uint64_t* cas_value) {
+  return popStore(policy::CB_HELLO_SELECT_FEATURES, cas_value);
+}
+bool CouchbaseOperations::CouchbaseResponse::popUpsert(uint64_t* cas_value) {
+  return popStore(policy::CB_BINARY_SET, cas_value);
+}
+bool CouchbaseOperations::CouchbaseResponse::popAdd(uint64_t* cas_value) {
+  return popStore(policy::CB_BINARY_ADD, cas_value);
+}
+// Warning: Not tested
+// bool CouchbaseOperations::CouchbaseResponse::PopReplace(uint64_t* cas_value)
+// {
+//   return popStore(policy::CB_BINARY_REPLACE, cas_value);
+// }
+bool CouchbaseOperations::CouchbaseResponse::popAppend(uint64_t* cas_value) {
+  return popStore(policy::CB_BINARY_APPEND, cas_value);
+}
+bool CouchbaseOperations::CouchbaseResponse::popPrepend(uint64_t* cas_value) {
+  return popStore(policy::CB_BINARY_PREPEND, cas_value);
+}
+bool CouchbaseOperations::CouchbaseResponse::popSelectBucket(
+    uint64_t* cas_value) {
+  if (popStore(policy::CB_SELECT_BUCKET, cas_value) == false) {
+    DEBUG_PRINT("Failed to select bucket: " << _err);
+    return false;
+  }
+  // Note: Bucket tracking is now handled at CouchbaseOperations level, not
+  // per-thread
+  return true;
+}
+// Collection-related response method
+bool CouchbaseOperations::CouchbaseResponse::popCollectionId(
+    uint8_t* collection_id) {
+  const size_t n = _buf.size();
+  policy::CouchbaseResponseHeader header;
+  if (n < sizeof(header)) {
+    butil::string_printf(&_err, "buffer is too small to contain a header");
+    return false;
+  }
+  _buf.copy_to(&header, sizeof(header));
+
+  if (header.command != policy::CB_COLLECTIONS_GET_CID) {
+    butil::string_printf(&_err, "Not a collection ID response");
+    return false;
+  }
+
+  // Making sure buffer has the whole body (extras + key + value)
+  if (n < sizeof(header) + header.total_body_length) {
+    butil::string_printf(&_err, "Not enough data");
+    return false;
+  }
+
+  if (header.status != 0) {
+    // handle error case
+    _buf.pop_front(sizeof(header) + header.extras_length + header.key_length);
+    // Possibly read error message from value if present
+    size_t value_size =
+        header.total_body_length - header.extras_length - header.key_length;
+    if (value_size > 0) {
+      std::string err_msg;
+      _buf.cutn(&err_msg, value_size);
+      _err =
+          formatErrorMessage(header.status, "Collection ID request", err_msg);
+    } else {
+      _err = formatErrorMessage(header.status, "Collection ID request");
+    }
+    return false;
+  }
+
+  // Success case: we expect extras_length >= 12 (8 bytes manifest + 4 bytes
+  // collection id)
+  if (header.extras_length < 12) {
+    butil::string_printf(&_err, "Extras too small to contain collection ID");
+    // remove the response from buffer so you don't re‐process
+    _buf.pop_front(sizeof(header) + header.total_body_length);
+    return false;
+  }
+
+  // Skip header
+  _buf.pop_front(sizeof(header));
+
+  // return true;
+  uint64_t manifest_id_net = 0;
+  _buf.copy_to(reinterpret_cast<uint64_t*>(&manifest_id_net),
+               sizeof(manifest_id_net));
+  // You may convert this if needed:
+  uint64_t manifest_id = butil::NetToHost64(manifest_id_net);
+  DEBUG_PRINT("Manifest ID: " << manifest_id);
+  _buf.pop_front(sizeof(manifest_id_net));
+
+  // Next 1 bytes → collection ID (u8)
+  uint32_t cid_net = 0;
+  _buf.copy_to(reinterpret_cast<uint8_t*>(&cid_net), sizeof(cid_net));
+  uint8_t cid_host = butil::NetToHost32(cid_net);
+  *collection_id = static_cast<int>(cid_host);
+  _buf.pop_front(sizeof(cid_net));
+
+  _buf.pop_front(header.total_body_length);
+  _err.clear();
+  return true;
+}
+
+bool CouchbaseOperations::CouchbaseResponse::popManifest(
+    std::string* manifest_json) {
+  const size_t n = _buf.size();
+  policy::CouchbaseResponseHeader header;
+  if (n < sizeof(header)) {
+    butil::string_printf(&_err, "buffer is too small to contain a header");
+    return false;
+  }
+  _buf.copy_to(&header, sizeof(header));
+
+  if (header.command != policy::CB_GET_COLLECTIONS_MANIFEST) {
+    butil::string_printf(&_err, "Not a get collections manifest response");
+    return false;
+  }
+
+  // Making sure buffer has the whole body (extras + key + value)
+  if (n < sizeof(header) + header.total_body_length) {
+    butil::string_printf(&_err, "Not enough data");
+    return false;
+  }
+
+  if (header.status != 0) {
+    // handle error case
+    if (header.extras_length != 0) {
+      DEBUG_PRINT("Get Collections Manifest response must not have extras");
+    }
+    if (header.key_length != 0) {
+      DEBUG_PRINT("Get Collections Manifest response must not have key");
+    }
+    _buf.pop_front(sizeof(header) + header.extras_length + header.key_length);
+    // Possibly read error message from value if present
+    size_t value_size =
+        header.total_body_length - header.extras_length - header.key_length;
+    if (value_size > 0) {
+      std::string err_msg;
+      _buf.cutn(&err_msg, value_size);
+      _err = formatErrorMessage(header.status, "Get Collections Manifest",
+                                err_msg);
+    } else {
+      _err = formatErrorMessage(header.status, "Get Collections Manifest");
+    }
+    return false;
+  }
+
+  // Success case: the manifest should be in the value section
+  size_t value_size =
+      header.total_body_length - header.extras_length - header.key_length;
+  if (value_size == 0) {
+    butil::string_printf(&_err, "No manifest data in response");
+    _buf.pop_front(sizeof(header) + header.total_body_length);
+    return false;
+  }
+
+  // Skip header and any extras/key
+  _buf.pop_front(sizeof(header) + header.extras_length + header.key_length);
+
+  // Read the manifest JSON from the value section
+  _buf.cutn(manifest_json, value_size);
+
+  _err.clear();
+  return true;
+}
+
+struct IncrHeaderWithExtras {
+  policy::CouchbaseRequestHeader header;
+  uint64_t delta;
+  uint64_t initial_value;
+  uint32_t exptime;
+} __attribute__((packed));
+BAIDU_CASSERT(sizeof(IncrHeaderWithExtras) == 44, must_match);
+
+const size_t INCR_EXTRAS =
+    sizeof(IncrHeaderWithExtras) - sizeof(policy::CouchbaseRequestHeader);
+
+// MUST have extras.
+// MUST have key.
+// MUST NOT have value.
+// Extra data for incr/decr:
+// Byte/     0       |       1       |       2       |       3       |
+//    /              |               |               |               |
+//   |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+//   +---------------+---------------+---------------+---------------+
+//  0| Delta to add / subtract                                      |
+//   |                                                               |
+//   +---------------+---------------+---------------+---------------+
+//  8| Initial value                                                 |
+//   |                                                               |
+//   +---------------+---------------+---------------+---------------+
+// 16| Expiration                                                    |
+//   +---------------+---------------+---------------+---------------+
+//   Total 20 bytes
+bool CouchbaseOperations::CouchbaseRequest::counter(
+    uint8_t command, const butil::StringPiece& key, uint64_t delta,
+    uint64_t initial_value, uint32_t exptime) {
+  IncrHeaderWithExtras header_with_extras = {
+      {policy::CB_MAGIC_REQUEST, command, butil::HostToNet16(key.size()),
+       INCR_EXTRAS, policy::CB_BINARY_RAW_BYTES, 0,
+       butil::HostToNet32(INCR_EXTRAS + key.size()), 0, 0},
+      butil::HostToNet64(delta),
+      butil::HostToNet64(initial_value),
+      butil::HostToNet32(exptime)};
+  if (_buf.append(&header_with_extras, sizeof(header_with_extras))) {
+    return false;
+  }
+  if (_buf.append(key.data(), key.size())) {
+    return false;
+  }
+  ++_pipelined_count;
+  return true;
+}
+
+// MUST NOT have extras.
+// MUST NOT have key.
+// MUST have value.
+// Byte/     0       |       1       |       2       |       3       |
+//    /              |               |               |               |
+//   |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+//   +---------------+---------------+---------------+---------------+
+//  0| 64-bit unsigned response.                                     |
+//   |                                                               |
+//   +---------------+---------------+---------------+---------------+
+//   Total 8 bytes
+bool CouchbaseOperations::CouchbaseResponse::popCounter(uint8_t command,
+                                                        uint64_t* new_value,
+                                                        uint64_t* cas_value) {
+  const size_t n = _buf.size();
+  policy::CouchbaseResponseHeader header;
+  if (n < sizeof(header)) {
+    butil::string_printf(&_err, "buffer is too small to contain a header");
+    return false;
+  }
+  _buf.copy_to(&header, sizeof(header));
+  if (header.command != command) {
+    butil::string_printf(&_err, "not a INCR/DECR response");
+    return false;
+  }
+  if (n < sizeof(header) + header.total_body_length) {
+    butil::string_printf(&_err, "response=%u < header=%u + body=%u",
+                         (unsigned)n, (unsigned)sizeof(header),
+                         header.total_body_length);
+    return false;
+  }
+  if (DBUG && header.extras_length != 0) {
+    DEBUG_PRINT("INCR/DECR response must not have flags");
+  }
+  if (DBUG && header.key_length != 0) {
+    DEBUG_PRINT("INCR/DECR response must not have key");
+  }
+  const int value_size = (int)header.total_body_length -
+                         (int)header.extras_length - (int)header.key_length;
+  _buf.pop_front(sizeof(header) + header.extras_length + header.key_length);
+
+  if (header.status != (uint16_t)STATUS_SUCCESS) {
+    if (value_size < 0) {
+      butil::string_printf(&_err, "value_size=%d is negative", value_size);
+    } else {
+      if (value_size > 0) {
+        std::string error_msg;
+        _buf.cutn(&error_msg, value_size);
+        _err =
+            formatErrorMessage(header.status, "Counter operation", error_msg);
+      } else {
+        _err = formatErrorMessage(header.status, "Counter operation");
+      }
+    }
+    return false;
+  }
+  if (value_size != 8) {
+    butil::string_printf(&_err, "value_size=%d is not 8", value_size);
+    return false;
+  }
+  uint64_t raw_value = 0;
+  _buf.cutn(&raw_value, sizeof(raw_value));
+  *new_value = butil::NetToHost64(raw_value);
+  if (cas_value) {
+    *cas_value = header.cas_value;
+  }
+  _err.clear();
+  return true;
+}
+
+// MUST NOT have extras.
+// MUST NOT have key.
+// MUST NOT have value.
+bool CouchbaseOperations::CouchbaseRequest::versionRequest() {
+  const policy::CouchbaseRequestHeader header = {policy::CB_MAGIC_REQUEST,
+                                                 policy::CB_BINARY_VERSION,
+                                                 0,
+                                                 0,
+                                                 policy::CB_BINARY_RAW_BYTES,
+                                                 0,
+                                                 0,
+                                                 0,
+                                                 0};
+  if (_buf.append(&header, sizeof(header))) {
+    return false;
+  }
+  ++_pipelined_count;
+  return true;
+}
+
+bool CouchbaseOperations::CouchbaseResponse::popVersion(std::string* version) {
+  const size_t n = _buf.size();
+  policy::CouchbaseResponseHeader header;
+  if (n < sizeof(header)) {
+    butil::string_printf(&_err, "buffer is too small to contain a header");
+    return false;
+  }
+  _buf.copy_to(&header, sizeof(header));
+  if (header.command != policy::CB_BINARY_VERSION) {
+    butil::string_printf(&_err, "not a VERSION response");
+    return false;
+  }
+  if (n < sizeof(header) + header.total_body_length) {
+    butil::string_printf(&_err, "response=%u < header=%u + body=%u",
+                         (unsigned)n, (unsigned)sizeof(header),
+                         header.total_body_length);
+    return false;
+  }
+  if (DBUG && header.extras_length != 0) {
+    DEBUG_PRINT("VERSION response must not have flags");
+  }
+  if (DBUG && header.key_length != 0) {
+    DEBUG_PRINT("VERSION response must not have key");
+  }
+  const int value_size = (int)header.total_body_length -
+                         (int)header.extras_length - (int)header.key_length;
+  _buf.pop_front(sizeof(header) + header.extras_length + header.key_length);
+  if (value_size < 0) {
+    butil::string_printf(&_err, "value_size=%d is negative", value_size);
+    return false;
+  }
+  if (header.status != (uint16_t)STATUS_SUCCESS) {
+    if (value_size > 0) {
+      std::string error_msg;
+      _buf.cutn(&error_msg, value_size);
+      _err = formatErrorMessage(header.status, "Version request", error_msg);
+    } else {
+      _err = formatErrorMessage(header.status, "Version request");
+    }
+    return false;
+  }
+  if (version) {
+    version->clear();
+    _buf.cutn(version, value_size);
+  }
+  _err.clear();
+  return true;
+}
+
+bool sendRequest(CouchbaseOperations::operation_type op_type, const std::string& key,
+                 const std::string& value, std::string collection_name,
+                 CouchbaseOperations::Result* result, brpc::Channel* channel,
+                 const std::string& server, const std::string& bucket,
+                 CouchbaseOperations::CouchbaseRequest* request,
+                 CouchbaseOperations::CouchbaseResponse* response) {
+  if (channel == nullptr) {
+    DEBUG_PRINT("No channel found, make sure to call Authenticate() first");
+    result->error_message =
+        "No channel found, make sure to call Authenticate() first";
+    return false;
+  }
+  if (server.empty()) {
+    DEBUG_PRINT("Server is empty, make sure to call Authenticate() first");
+    result->error_message =
+        "Server is empty, make sure to call Authenticate() first";
+    return false;
+  }
+  if (bucket.empty()) {
+    DEBUG_PRINT("No bucket selected, make sure to call SelectBucket() first");
+    result->error_message =
+        "No bucket selected, make sure to call SelectBucket() first";
+    return false;
+  }
+  brpc::Controller cntl;
+  bool request_created = false;
+  switch (op_type) {
+    case CouchbaseOperations::GET:
+      request_created =
+          request->getRequest(key, collection_name, channel, server, bucket);
+      break;
+    case CouchbaseOperations::UPSERT:
+      request_created = request->upsertRequest(
+          key, value, 0, 0, 0, collection_name, channel, server, bucket);
+      break;
+    case CouchbaseOperations::ADD:
+      request_created = request->addRequest(
+          key, value, 0, 0, 0, collection_name, channel, server, bucket);
+      break;
+    case CouchbaseOperations::APPEND:
+      request_created = request->appendRequest(
+          key, value, 0, 0, 0, collection_name, channel, server, bucket);
+      break;
+    case CouchbaseOperations::PREPEND:
+      request_created = request->prependRequest(
+          key, value, 0, 0, 0, collection_name, channel, server, bucket);
+      break;
+    case CouchbaseOperations::DELETE:
+      request_created =
+          request->deleteRequest(key, collection_name, channel, server, bucket);
+      break;
+    default:
+      DEBUG_PRINT("Unsupported operation type");
+      result->success = false;
+      result->value = "";
+      result->error_message = "Unsupported operation type";
+      return false;
+  }
+  if (!request_created) {
+    DEBUG_PRINT("CollectionID does not exist." << op_type);
+    result->success = false;
+    result->value = "";
+    result->error_message =
+        "CollectionID does not exist." + std::to_string(op_type);
+    result->status_code = 0x88;  // using 0x88 as the only possible failure code
+                                 // that indicates the collectionID is not found
+    return false;
+  }
+  channel->CallMethod(NULL, &cntl, request, response, NULL);
+  if (cntl.Failed()) {
+    DEBUG_PRINT("Failed to perform operation on key: "
+                << key << " to Couchbase: " << cntl.ErrorText());
+    result->success = false;
+    result->value = "";
+    result->error_message = cntl.ErrorText();
+    return false;
+  }
+  if (op_type == CouchbaseOperations::GET) {
+    std::string value;
+    uint32_t flags = 0;
+    uint64_t cas = 0;
+    if (response->popGet(&value, &flags, &cas) == false) {
+      result->success = false;
+      result->value = "";
+      result->error_message = response->lastError();
+      result->status_code = response->_status_code;
+      if (result->status_code == 0x88) {
+        DEBUG_PRINT(
+            "CollectionID does not exist on server, need to refresh collection "
+            "manifest from server");
+        // could have called sendRequest recursively,
+        // but if somehow the collectionID keeps on chaning, it would lead to
+        // infinite recursion and stack overflow in the end. so we retry once
+        // here instead and return failure if it still fails.
+
+        // (0x88) unknown collection, this means that the collection_manifest
+        // has been updated on the server side. The collectionID present in the
+        // local cache/global cache is no longer valid. This can happen if a
+        // collection is deleted and recreated with the same name.
+        if (!request->metadata_tracking->refreshCollectionManifest(
+                channel, server, bucket,
+                request->local_collection_manifest_cache)) {
+          DEBUG_PRINT("Failed to refresh collection manifest");
+          result->error_message = "Failed to refresh collection manifest";
+        } else {
+          DEBUG_PRINT("Successfully refreshed collection manifest");
+          // retry the request;
+          request->Clear();
+          response->Clear();
+          cntl.Reset();
+          if (!request->getRequest(key, collection_name, channel, server,
+                                   bucket)) {
+            DEBUG_PRINT("CollectionID does not exist.");
+            result->success = false;
+            result->value = "";
+            result->error_message = "CollectionID does not exist.";
+            result->status_code =
+                0x88;  // using 0x88 as the only possible failure code that
+                       // indicates the collectionID is not found
+            return false;
+          }
+          channel->CallMethod(NULL, &cntl, request, response, NULL);
+          if (cntl.Failed()) {
+            DEBUG_PRINT("Failed to perform operation on key: "
+                        << key << " to Couchbase: " << cntl.ErrorText());
+            result->success = false;
+            result->value = "";
+            result->error_message = cntl.ErrorText();
+            return false;  // return on failure
+          }
+          if (response->popGet(&value, &flags, &cas) == false) {
+            result->success = false;
+            result->value = "";
+            result->error_message = response->lastError();
+            result->status_code = response->_status_code;
+            return false;  // return on failure
+          }
+          // Successfully got the value after retry
+          result->success = true;
+          result->value = value;
+          result->status_code = 0;
+          return true;
+        }
+      }
+      return false;
+    }
+    // Successfully got the value
+    result->success = true;
+    result->value = value;
+    result->status_code = 0;
+    return true;
+  } else {
+    uint64_t cas_value = 0;
+    // pop response on the basis of operation type
+    bool pop_success = false;
+    switch (op_type) {
+      case CouchbaseOperations::UPSERT:
+        pop_success = response->popUpsert(&cas_value);
+        break;
+      case CouchbaseOperations::ADD:
+        pop_success = response->popAdd(&cas_value);
+        break;
+      case CouchbaseOperations::APPEND:
+        pop_success = response->popAppend(&cas_value);
+        break;
+      case CouchbaseOperations::PREPEND:
+        pop_success = response->popPrepend(&cas_value);
+        break;
+      case CouchbaseOperations::DELETE:
+        pop_success = response->popDelete();
+        break;
+      default:
+        DEBUG_PRINT("Unsupported operation type in response pop");
+        result->success = false;
+        result->value = "";
+        result->error_message = "Unsupported operation type in response pop";
+        return false;
+    }
+    if (!pop_success) {
+      result->success = false;
+      result->value = "";
+      result->error_message = response->lastError();
+      result->status_code = response->_status_code;
+      if (result->status_code == 0x88) {
+        // (0x88) unknown collection, this typically means that the
+        // collection_manifest has been updated on the server side. and the
+        // client have a stale copy of collection manifest. In this case, we
+        // need to refresh the collection manifest and retry the operation.
+        if (!request->metadata_tracking->refreshCollectionManifest(
+                channel, server, bucket,
+                request->local_collection_manifest_cache)) {
+          DEBUG_PRINT("Failed to refresh collection manifest");
+          result->error_message = "Failed to refresh collection manifest";
+          return false;
+        }
+        // could have called sendRequest recursively,
+        // but if somehow the collectionID keeps on chaning, it would lead to
+        // infinite recursion and stack overflow in the end. so we retry once
+        // here instead and return failure if it still fails.
+        DEBUG_PRINT("Successfully refreshed collection manifest");
+        // retry the request;
+        request->Clear();
+        response->Clear();
+        switch (op_type) {
+          case CouchbaseOperations::UPSERT:
+            request->upsertRequest(key, value, 0, 0, 0, collection_name,
+                                   channel, server, bucket);
+            break;
+          case CouchbaseOperations::ADD:
+            request->addRequest(key, value, 0, 0, 0, collection_name, channel,
+                                server, bucket);
+            break;
+          case CouchbaseOperations::APPEND:
+            request->appendRequest(key, value, 0, 0, 0, collection_name,
+                                   channel, server, bucket);
+            break;
+          case CouchbaseOperations::PREPEND:
+            request->prependRequest(key, value, 0, 0, 0, collection_name,
+                                    channel, server, bucket);
+            break;
+          case CouchbaseOperations::DELETE:
+            request->deleteRequest(key, collection_name, channel, server,
+                                   bucket);
+            break;
+          default:
+            DEBUG_PRINT("Unsupported operation type in response pop");
+            result->success = false;
+            result->value = "";
+            result->error_message =
+                "Unsupported operation type in response pop";
+            return false;
+        }
+        channel->CallMethod(NULL, &cntl, request, response, NULL);
+        if (cntl.Failed()) {
+          DEBUG_PRINT("Failed to perform operation on key: "
+                      << key << " to Couchbase: " << cntl.ErrorText());
+          result->success = false;
+          result->value = "";
+          result->error_message = cntl.ErrorText();
+          return false;  // return on failure
+        }
+        pop_success = false;
+        switch (op_type) {
+          case CouchbaseOperations::UPSERT:
+            pop_success = response->popUpsert(&cas_value);
+            break;
+          case CouchbaseOperations::ADD:
+            pop_success = response->popAdd(&cas_value);
+            break;
+          case CouchbaseOperations::APPEND:
+            pop_success = response->popAppend(&cas_value);
+            break;
+          case CouchbaseOperations::PREPEND:
+            pop_success = response->popPrepend(&cas_value);
+            break;
+          case CouchbaseOperations::DELETE:
+            pop_success = response->popDelete();
+            break;
+          default:
+            DEBUG_PRINT("Unsupported operation type in response pop");
+            result->success = false;
+            result->value = "";
+            result->error_message =
+                "Unsupported operation type in response pop";
+            return false;
+        }
+        if (!pop_success) {
+          result->success = false;
+          result->value = "";
+          result->error_message = response->lastError();
+          result->status_code = response->_status_code;
+          return false;  // return on failure
+        }
+        // Successfully performed the operation after retry
+        result->success = true;
+        result->value = "";
+        result->status_code = 0;
+        return true;
+      }
+      return false;
+    }
+    // Successfully performed the operation
+    // Note: For operations other than GET, we don't have a value to return
+    // so we return empty std::string for value.
+    result->success = true;
+    result->value = "";
+    result->status_code = 0;
+    return true;
+  }
+}
+CouchbaseOperations::Result CouchbaseOperations::get(const std::string& key,
+                                                     std::string collection_name) {
+  // create CouchbaseRequest and CouchbaseResponse objects and then using the
+  // channel which is created for this thread in authenticate() use it to call()
+  CouchbaseRequest request(&local_bucket_to_collection_manifest_);
+  CouchbaseResponse response;
+  brpc::Controller cntl;
+  CouchbaseOperations::Result result;
+  sendRequest(CouchbaseOperations::GET, key, "", collection_name, &result,
+              channel_, server_address_, selected_bucket_, &request, &response);
+  return result;
+}
+
+bool CouchbaseOperations::CouchbaseRequest::getLocalCachedCollectionId(
+    const std::string& bucket, const std::string& scope, const std::string& collection,
+    uint8_t* collection_id) {
+  if (bucket.empty() || scope.empty() || collection.empty()) {
+    DEBUG_PRINT("Bucket, scope, and collection names must be non-empty");
+    return false;
+  }
+  auto it = local_collection_manifest_cache->find(bucket);
+  if (it != local_collection_manifest_cache->end()) {
+    CouchbaseManifestManager::CollectionManifest& manifest = it->second;
+    if (manifest.scope_to_collection_id_map.find(scope) !=
+        manifest.scope_to_collection_id_map.end()) {
+      auto& collection_map = manifest.scope_to_collection_id_map[scope];
+      if (collection_map.find(collection) != collection_map.end()) {
+        *collection_id = collection_map[collection];
+        return true;
+      } else {
+        DEBUG_PRINT("Collection name not found in local cache: " << collection);
+        return false;
+      }
+    } else {
+      DEBUG_PRINT("Scope name not found in local cache: " << scope);
+      return false;
+    }
+  } else {
+    DEBUG_PRINT("Bucket name not found in local cache: " << bucket);
+    return false;
+  }
+}
+
+CouchbaseOperations::Result CouchbaseOperations::upsert(
+    const std::string& key, const std::string& value, std::string collection_name) {
+  CouchbaseRequest request(&local_bucket_to_collection_manifest_);
+  CouchbaseResponse response;
+  brpc::Controller cntl;
+  CouchbaseOperations::Result result;
+  sendRequest(CouchbaseOperations::UPSERT, key, value, collection_name, &result,
+              channel_, server_address_, selected_bucket_, &request, &response);
+  return result;
+}
+
+CouchbaseOperations::Result CouchbaseOperations::delete_(
+    const std::string& key, std::string collection_name) {
+  CouchbaseRequest request(&local_bucket_to_collection_manifest_);
+  CouchbaseResponse response;
+  brpc::Controller cntl;
+  CouchbaseOperations::Result result;
+  if (!sendRequest(CouchbaseOperations::DELETE, key, "", collection_name,
+                   &result, channel_, server_address_, selected_bucket_,
+                   &request, &response)) {
+    return result;
+  }
+  return result;
+}
+
+CouchbaseOperations::Result CouchbaseOperations::add(const std::string& key,
+                                                     const std::string& value,
+                                                     std::string collection_name) {
+  CouchbaseRequest request(&local_bucket_to_collection_manifest_);
+  CouchbaseResponse response;
+  brpc::Controller cntl;
+  CouchbaseOperations::Result result;
+  sendRequest(CouchbaseOperations::ADD, key, value, collection_name, &result,
+              channel_, server_address_, selected_bucket_, &request, &response);
+  return result;
+}
+
+CouchbaseOperations::Result CouchbaseOperations::authenticate(
+    const std::string& username, const std::string& password,
+    const std::string& server_address, const std::string& bucket_name) {
+  return authenticateAll(username, password, server_address, bucket_name, false,
+                         "");
+}
+
+CouchbaseOperations::Result CouchbaseOperations::authenticateSSL(
+    const std::string& username, const std::string& password,
+    const std::string& server_address, const std::string& bucket_name,
+    std::string path_to_cert) {
+  return authenticateAll(username, password, server_address, bucket_name, true,
+                         path_to_cert);
+}
+
+CouchbaseOperations::Result CouchbaseOperations::authenticateAll(
+    const std::string& username, const std::string& password,
+    const std::string& server_address, const std::string& bucket_name, bool enable_ssl,
+    std::string path_to_cert) {
+  // Create a channel to the Couchbase server
+  brpc::ChannelOptions options;
+  options.protocol = brpc::PROTOCOL_COUCHBASE;
+  options.connection_type = "single";
+  options.timeout_ms = 1000;  // 1 second
+  options.max_retry = 3;
+
+  // CRITICAL: Set unique connection_group to prevent connection sharing
+  // Each CouchbaseOperations instance connected to same bucket gets its own
+  // connection group
+  options.connection_group = server_address + bucket_name;
+
+  // enable_ssl
+  if (enable_ssl) {
+    brpc::ChannelSSLOptions* ssl_options = options.mutable_ssl_options();
+    ssl_options->sni_name = server_address;
+    ssl_options->verify.verify_depth =
+        1;  // Enable certificate verification, to disable SSL set it to 0
+    ssl_options->verify.ca_file_path =
+        path_to_cert;  // Path to your downloaded TLS certificate
+  }
+  CouchbaseOperations::Result result;
+  brpc::Channel* new_channel = new brpc::Channel();
+  if (new_channel->Init(server_address.c_str(), &options) != 0) {
+    DEBUG_PRINT("Failed to initialize Couchbase channel to " << server_address);
+    delete new_channel;
+    result.success = false;
+    result.value = "";
+    result.error_message = "Failed to initialize Couchbase channel";
+    return result;
+  }
+  // Create a CouchbaseRequest and CouchbaseResponse for authentication
+  CouchbaseRequest request;
+  CouchbaseResponse response;
+  brpc::Controller cntl;
+  if (request.authenticateRequest(username.c_str(), password.c_str()) ==
+      false) {
+    DEBUG_PRINT("Failed to create Authenticate request for user: " << username);
+    delete new_channel;
+    result.success = false;
+    return result;
+  }
+  new_channel->CallMethod(NULL, &cntl, &request, &response, NULL);
+  if (cntl.Failed()) {
+    DEBUG_PRINT("Failed to access Couchbase: " << cntl.ErrorText());
+    delete new_channel;
+    result.success = false;
+    result.value = "";
+    result.error_message = cntl.ErrorText();
+    return result;
+  }
+  uint64_t cas;
+  if (response.popHello(&cas) == false) {
+    DEBUG_PRINT("Failed to receive HELO response from Couchbase: "
+                << response.lastError());
+    delete new_channel;
+    result.success = false;
+    result.value = "";
+    result.error_message = response.lastError();
+    result.status_code = response._status_code;
+    return result;
+  }
+  if (response.popAuthenticate(&cas) == false) {
+    DEBUG_PRINT("Failed to authenticate user: " << username << " to Couchbase: "
+                                                << response.lastError());
+    result.success = false;
+    result.value = "";
+    result.error_message = response.lastError();
+    result.status_code = response._status_code;
+    return result;
+  }
+  // Successfully authenticated
+  channel_ = new_channel;
+  this->server_address_ = server_address;
+  result.success = true;
+  result.status_code = 0;
+
+  DEBUG_PRINT("Instance " << reinterpret_cast<uintptr_t>(this)
+                          << " authenticated with unique connection_group:"
+                          << server_address_ + bucket_name);
+
+  // select the bucket
+  result = selectBucket(bucket_name);
+  return result;
+}
+
+CouchbaseOperations::Result CouchbaseOperations::selectBucket(
+    const std::string& bucket_name) {
+  CouchbaseRequest request(&local_bucket_to_collection_manifest_);
+  CouchbaseResponse response;
+  brpc::Controller cntl;
+  CouchbaseOperations::Result result;
+  if (request.selectBucketRequest(bucket_name.c_str()) == false) {
+    DEBUG_PRINT(
+        "Failed to create Select Bucket request for bucket: " << bucket_name);
+    result.success = false;
+    result.value = "";
+    return result;
+  }
+  channel_->CallMethod(NULL, &cntl, &request, &response, NULL);
+  if (cntl.Failed()) {
+    DEBUG_PRINT("Failed to select bucket: "
+                << bucket_name << " from Couchbase: " << cntl.ErrorText());
+    result.success = false;
+    result.value = "";
+    result.error_message = cntl.ErrorText();
+    return result;
+  }
+  if (response.popSelectBucket(NULL) == false) {
+    result.success = false;
+    result.value = "";
+    result.error_message = response.lastError();
+    result.status_code = response._status_code;
+    return result;
+  }
+  // Successfully selected the bucket
+  selected_bucket_ = bucket_name;
+  result.success = true;
+  result.value = "";
+  result.status_code = 0;
+
+  // fetch the collection manifest for this bucket and store it in local cache
+  if (request.local_collection_manifest_cache->find(bucket_name) ==
+      request.local_collection_manifest_cache->end()) {
+    // only fetch if not already present in the local cache
+    CouchbaseManifestManager::CollectionManifest manifest;
+    if (!common_metadata_tracking.getBucketToCollectionManifest(
+            server_address_, bucket_name, &manifest)) {
+      DEBUG_PRINT("Collection manifest for bucket: "
+                  << bucket_name
+                  << " not found in global cache, the local cache");
+
+      // manifest for this bucket/server is not cached yet, will fetch it from
+      // server now. refresh will also update the local cache with the fetched
+      // manifest
+      request.metadata_tracking->refreshCollectionManifest(
+          channel_, server_address_, bucket_name,
+          request.local_collection_manifest_cache);
+      // We simply try once to prefetch the manifest, before any collection
+      // operation. If it fails, it will be lazily updated when a collection
+      // operation is performed.
+    } else {
+      // update the local cache with the cache manifest from global
+      // cache(common_metadata_tracking)
+      DEBUG_PRINT("Updated local cache collection manifest for bucket: "
+                  << bucket_name);
+      (*request.local_collection_manifest_cache)[bucket_name] = manifest;
+    }
+  } else {
+    DEBUG_PRINT("Collection manifest for bucket: "
+                << bucket_name << " already present in local cache");
+  }
+  DEBUG_PRINT("Bucket selected successfully " << bucket_name);
+  return result;
+}
+
+CouchbaseOperations::Result CouchbaseOperations::append(
+    const std::string& key, const std::string& value, std::string collection_name) {
+  CouchbaseRequest request(&local_bucket_to_collection_manifest_);
+  CouchbaseResponse response;
+  brpc::Controller cntl;
+  CouchbaseOperations::Result result;
+  sendRequest(CouchbaseOperations::APPEND, key, value, collection_name, &result,
+              channel_, server_address_, selected_bucket_, &request, &response);
+  return result;
+}
+
+CouchbaseOperations::Result CouchbaseOperations::prepend(
+    const std::string& key, const std::string& value, std::string collection_name) {
+  CouchbaseRequest request(&local_bucket_to_collection_manifest_);
+  CouchbaseResponse response;
+  brpc::Controller cntl;
+  CouchbaseOperations::Result result;
+  sendRequest(CouchbaseOperations::PREPEND, key, value, collection_name,
+              &result, channel_, server_address_, selected_bucket_, &request,
+              &response);
+  return result;
+}
+
+CouchbaseOperations::Result CouchbaseOperations::version() {
+  CouchbaseRequest request;
+  CouchbaseResponse response;
+  brpc::Controller cntl;
+  CouchbaseOperations::Result result;
+  if (request.versionRequest() == false) {
+    DEBUG_PRINT("Failed to create Version request");
+    result.success = false;
+    result.value = "";
+    return result;
+  }
+  channel_->CallMethod(NULL, &cntl, &request, &response, NULL);
+  if (cntl.Failed()) {
+    DEBUG_PRINT("Failed to get version from Couchbase: " << cntl.ErrorText());
+    result.success = false;
+    result.value = "";
+    result.error_message = cntl.ErrorText();
+    return result;
+  }
+  std::string version;
+  if (response.popVersion(&version) == false) {
+    result.success = false;
+    result.value = "";
+    result.error_message = response.lastError();
+    result.status_code = response._status_code;
+    return result;
+  }
+  // Successfully got version
+  result.success = true;
+  result.value = version;
+  result.status_code = 0;
+  return result;
+}
+
+bool CouchbaseOperations::beginPipeline() {
+  if (pipeline_active) {
+    DEBUG_PRINT("Pipeline already active. Call clearPipeline() first.");
+    return false;
+  }
+
+  // Clear any previous state
+  while (!pipeline_operations_queue.empty()) {
+    pipeline_operations_queue.pop();
+  }
+  pipeline_request_couchbase_req.Clear();
+
+  pipeline_active = true;
+  return true;
+}
+
+bool CouchbaseOperations::pipelineRequest(operation_type op_type,
+                                          const std::string& key,
+                                          const std::string& value,
+                                          std::string collection_name) {
+  if (!pipeline_active) {
+    DEBUG_PRINT("Pipeline not active. Call beginPipeline() first.");
+    return false;
+  }
+
+  switch (op_type) {
+    case GET:
+      if (pipeline_request_couchbase_req.getRequest(
+              key, collection_name, channel_, server_address_,
+              selected_bucket_) == false) {
+        return false;
+      }
+      pipeline_operations_queue.push(GET);
+      break;
+    case UPSERT:
+      if (pipeline_request_couchbase_req.upsertRequest(
+              key, value, 0, 0, 0, collection_name, channel_, server_address_,
+              selected_bucket_) == false) {
+        return false;
+      }
+      pipeline_operations_queue.push(UPSERT);
+      break;
+    case ADD:
+      if (pipeline_request_couchbase_req.addRequest(
+              key, value, 0, 0, 0, collection_name, channel_, server_address_,
+              selected_bucket_) == false) {
+        return false;
+      }
+      pipeline_operations_queue.push(ADD);
+      break;
+    case APPEND:
+      if (pipeline_request_couchbase_req.appendRequest(
+              key, value, 0, 0, 0, collection_name, channel_, server_address_,
+              selected_bucket_) == false) {
+        return false;
+      }
+      pipeline_operations_queue.push(APPEND);
+      break;
+    case PREPEND:
+      if (pipeline_request_couchbase_req.prependRequest(
+              key, value, 0, 0, 0, collection_name, channel_, server_address_,
+              selected_bucket_) == false) {
+        return false;
+      }
+      pipeline_operations_queue.push(PREPEND);
+      break;
+    case DELETE:
+      if (pipeline_request_couchbase_req.deleteRequest(
+              key, collection_name, channel_, server_address_,
+              selected_bucket_) == false) {
+        return false;
+      }
+      pipeline_operations_queue.push(DELETE);
+      break;
+    default:
+      DEBUG_PRINT("Invalid operation type for pipelining");
+      return false;
+  }
+  return true;
+}
+std::vector<CouchbaseOperations::Result> CouchbaseOperations::executePipeline() {
+  std::vector<CouchbaseOperations::Result> results;
+
+  if (!pipeline_active || pipeline_operations_queue.empty()) {
+    DEBUG_PRINT("No pipeline active or no operations queued");
+    return results;
+  }
+
+  brpc::Controller cntl;
+  channel_->CallMethod(NULL, &cntl, &pipeline_request_couchbase_req,
+                       &pipeline_response_couchbase_resp, NULL);
+
+  if (cntl.Failed()) {
+    DEBUG_PRINT("Pipeline execution failed: " << cntl.ErrorText());
+    // Create failure results for all operations
+    size_t op_count = pipeline_operations_queue.size();
+    results.reserve(op_count);
+
+    CouchbaseOperations::Result failure_result;
+    failure_result.success = false;
+    failure_result.error_message = cntl.ErrorText();
+
+    for (size_t i = 0; i < op_count; ++i) {
+      results.push_back(failure_result);
+    }
+
+    clearPipeline();
+    return results;
+  }
+
+  // Process each operation in the order they were added
+  CouchbaseOperations::CouchbaseResponse* response =
+      &pipeline_response_couchbase_resp;
+  while (!pipeline_operations_queue.empty()) {
+    CouchbaseOperations::Result result;
+    operation_type op_type = pipeline_operations_queue.front();
+    pipeline_operations_queue.pop();
+    switch (op_type) {
+      case GET: {
+        std::string value;
+        uint32_t flags = 0;
+        uint64_t cas = 0;
+        if (response->popGet(&value, &flags, &cas) == false) {
+          result.success = false;
+          result.value = "";
+          result.error_message = response->lastError();
+          result.status_code = response->_status_code;
+        } else {
+          result.success = true;
+          result.value = value;
+        }
+        results.push_back(result);
+        break;
+      }
+      case UPSERT: {
+        if (response->popUpsert(NULL) == false) {
+          result.success = false;
+          result.value = "";
+          result.error_message = response->lastError();
+          result.status_code = response->_status_code;
+        } else {
+          result.success = true;
+          result.value = "";
+        }
+        results.push_back(result);
+        break;
+      }
+      case ADD: {
+        if (response->popAdd(NULL) == false) {
+          result.success = false;
+          result.value = "";
+          result.error_message = response->lastError();
+          result.status_code = response->_status_code;
+        } else {
+          result.success = true;
+          result.value = "";
+        }
+        results.push_back(result);
+        break;
+      }
+      case APPEND: {
+        uint64_t cas_value;
+        if (response->popAppend(&cas_value) == false) {
+          result.success = false;
+          result.value = "";
+          result.error_message = response->lastError();
+          result.status_code = response->_status_code;
+        } else {
+          result.success = true;
+          result.value = "";
+        }
+        results.push_back(result);
+        break;
+      }
+      case PREPEND: {
+        uint64_t cas_value;
+        if (response->popPrepend(&cas_value) == false) {
+          result.success = false;
+          result.value = "";
+          result.error_message = response->lastError();
+          result.status_code = response->_status_code;
+        } else {
+          result.success = true;
+          result.value = "";
+        }
+        results.push_back(result);
+        break;
+      }
+      case DELETE: {
+        if (response->popDelete() == false) {
+          result.success = false;
+          result.value = "";
+          result.error_message = response->lastError();
+          result.status_code = response->_status_code;
+        } else {
+          result.success = true;
+          result.value = "";
+        }
+        results.push_back(result);
+        break;
+      }
+      default:
+        DEBUG_PRINT("Invalid operation type in pipeline response processing");
+        result.success = false;
+        result.value = "";
+        result.error_message = "Invalid operation type";
+        results.push_back(result);
+        break;
+    }
+  }
+
+  pipeline_active = false;
+  pipeline_request_couchbase_req.Clear();
+
+  return results;
+}
+
+bool CouchbaseOperations::clearPipeline() {
+  while (!pipeline_operations_queue.empty()) {
+    pipeline_operations_queue.pop();
+  }
+  pipeline_request_couchbase_req.Clear();
+  pipeline_active = false;
+  return true;
+}
+}  // namespace brpc
\ No newline at end of file
diff --git a/src/brpc/couchbase.h b/src/brpc/couchbase.h
new file mode 100644
index 0000000..3e52f60
--- /dev/null
+++ b/src/brpc/couchbase.h
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_COUCHBASE_H
+#define BRPC_COUCHBASE_H
+
+#endif
+
+#include <brpc/channel.h>
+
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+#include <string>
+#include <unordered_map>
+
+#include "brpc/nonreflectable_message.h"
+#include "brpc/pb_compat.h"
+#include "butil/iobuf.h"
+#include "butil/strings/string_piece.h"
+
+namespace brpc {
+
+// Forward declarations for friend functions
+class InputMessageBase;
+class Controller;
+namespace policy {
+void ProcessCouchbaseResponse(InputMessageBase* msg);
+void SerializeCouchbaseRequest(butil::IOBuf* buf, Controller* cntl,
+                               const google::protobuf::Message* request);
+}  // namespace policy
+
+// Simple C++11 compatible reader-writer lock
+class ReaderWriterLock {
+ private:
+  std::mutex mutex_;
+  std::condition_variable reader_cv_;
+  std::condition_variable writer_cv_;
+  std::atomic<int> reader_count_;
+  std::atomic<bool> writer_active_;
+  std::atomic<int> waiting_writers_;
+
+ public:
+  ReaderWriterLock()
+      : reader_count_(0), writer_active_(false), waiting_writers_(0) {}
+
+  void lock_shared() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    reader_cv_.wait(lock, [this] {
+      return !writer_active_.load() && waiting_writers_.load() == 0;
+    });
+    reader_count_.fetch_add(1);
+  }
+
+  void unlock_shared() {
+    reader_count_.fetch_sub(1);
+    if (reader_count_.load() == 0) {
+      std::lock_guard<std::mutex> lock(mutex_);
+      writer_cv_.notify_one();
+    }
+  }
+
+  void lock() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    waiting_writers_.fetch_add(1);
+    writer_cv_.wait(lock, [this] {
+      return !writer_active_.load() && reader_count_.load() == 0;
+    });
+    waiting_writers_.fetch_sub(1);
+    writer_active_.store(true);
+  }
+
+  void unlock() {
+    writer_active_.store(false);
+    std::lock_guard<std::mutex> lock(mutex_);
+    writer_cv_.notify_one();
+    reader_cv_.notify_all();
+  }
+};
+
+// RAII helper classes
+class SharedLock {
+ private:
+  ReaderWriterLock& lock_;
+
+ public:
+  explicit SharedLock(ReaderWriterLock& lock) : lock_(lock) {
+    lock_.lock_shared();
+  }
+  ~SharedLock() { lock_.unlock_shared(); }
+};
+
+class UniqueLock {
+ private:
+  ReaderWriterLock& lock_;
+
+ public:
+  explicit UniqueLock(ReaderWriterLock& lock) : lock_(lock) { lock_.lock(); }
+  ~UniqueLock() { lock_.unlock(); }
+};
+
+// manager
+class CouchbaseManifestManager {
+ public:
+  struct CollectionManifest {
+    std::string uid;  // uid of the manifest, it can be used to track if the manifest
+                 // is updated
+    std::unordered_map<std::string, std::unordered_map<std::string, uint8_t>>
+        scope_to_collection_id_map;  // scope -> (collection -> collection_id)
+  };
+
+ private:
+  std::unordered_map<std::string /*server*/,
+                std::unordered_map<std::string /*bucket*/, CollectionManifest>>
+      bucket_to_collection_manifest_;
+  ReaderWriterLock rw_bucket_to_collection_manifest_mutex_;
+
+ public:
+  CouchbaseManifestManager() {}
+  ~CouchbaseManifestManager() { bucket_to_collection_manifest_.clear(); }
+  bool setBucketToCollectionManifest(std::string server, std::string bucket,
+                                     CollectionManifest manifest);
+
+  bool getBucketToCollectionManifest(std::string server, std::string bucket,
+                                     CollectionManifest* manifest);
+  bool getManifestToCollectionId(CollectionManifest* manifest, std::string scope,
+                                 std::string collection, uint8_t* collection_id);
+
+  bool jsonToCollectionManifest(const std::string& json,
+                                CollectionManifest* manifest);
+  bool refreshCollectionManifest(
+      brpc::Channel* channel, const std::string& server, const std::string& bucket,
+      std::unordered_map<std::string, CollectionManifest>* local_cache = nullptr);
+} static common_metadata_tracking;
+class CouchbaseOperations {
+ public:
+  enum operation_type {
+    GET = 1,
+    UPSERT = 2,
+    ADD = 3,
+    REPLACE = 4,
+    APPEND = 5,
+    PREPEND = 6,
+    DELETE = 7
+  };
+  struct Result {
+    bool success;
+    std::string error_message;
+    std::string value;
+    uint16_t status_code;  // 0x00 if success
+  };
+  Result get(const std::string& key, std::string collection_name = "_default");
+  Result upsert(const std::string& key, const std::string& value,
+                std::string collection_name = "_default");
+  Result add(const std::string& key, const std::string& value,
+             std::string collection_name = "_default");
+  // Warning: Not tested
+  // Result replace(const std::string& key, const std::string& value, std::string
+  // collection_name = "_default");
+  Result append(const std::string& key, const std::string& value,
+                std::string collection_name = "_default");
+  Result prepend(const std::string& key, const std::string& value,
+                 std::string collection_name = "_default");
+  Result delete_(const std::string& key, std::string collection_name = "_default");
+  // Warning: Not tested
+  // Result Increment(const string& key, uint64_t delta, uint64_t initial_value,
+  // uint32_t exptime, string collection_name = "_default"); Result
+  // Decrement(const string& key, uint64_t delta, uint64_t initial_value,
+  // uint32_t exptime, string collection_name = "_default"); Result Touch(const
+  // string& key, uint32_t exptime, string collection_name = "_default"); Result
+  // Flush(uint32_t timeout = 0);
+  Result version();
+  Result authenticateSSL(const std::string& username, const std::string& password,
+                         const std::string& server_address,
+                         const std::string& bucket_name, std::string path_to_cert = "");
+  Result authenticate(const std::string& username, const std::string& password,
+                      const std::string& server_address, const std::string& bucket_name);
+  Result selectBucket(const std::string& bucket_name);
+
+  // Pipeline management
+  bool beginPipeline();
+  bool pipelineRequest(operation_type op_type, const std::string& key,
+                       const std::string& value = "",
+                       std::string collection_name = "_default");
+  std::vector<Result> executePipeline();  // Return by value instead of pointer
+  bool clearPipeline();
+
+  // Pipeline status
+  bool isPipelineActive() const { return pipeline_active; }
+  size_t getPipelineSize() const { return pipeline_operations_queue.size(); }
+
+  CouchbaseOperations()
+      : pipeline_request_couchbase_req(&local_bucket_to_collection_manifest_),
+        pipeline_active(false) {}
+  ~CouchbaseOperations() {}
+  bool getLocalCachedCollectionId(const std::string& bucket, const std::string& scope,
+                                  const std::string& collection, uint8_t* coll_id);
+
+ private:
+  CouchbaseOperations::Result authenticateAll(const std::string& username,
+                                              const std::string& password,
+                                              const std::string& server_address,
+                                              const std::string& bucket_name,
+                                              bool enable_ssl,
+                                              std::string path_to_cert);
+  friend void policy::ProcessCouchbaseResponse(InputMessageBase* msg);
+  friend void policy::SerializeCouchbaseRequest(
+      butil::IOBuf* buf, Controller* cntl,
+      const google::protobuf::Message* request);
+  brpc::Channel* channel_;
+  std::string server_address_;
+  std::string selected_bucket_;
+
+  std::unordered_map<std::string /*bucket*/, CouchbaseManifestManager::CollectionManifest>
+      local_bucket_to_collection_manifest_;
+
+ public:
+  // these classes have been made public so that normal user can also create
+  // advanced bRPC programs as per their requirements.
+  class CouchbaseRequest : public NonreflectableMessage<CouchbaseRequest> {
+   public:
+    static brpc::CouchbaseManifestManager* metadata_tracking;
+    int _pipelined_count;
+    butil::IOBuf _buf;
+    mutable int _cached_size_;
+    void sharedCtor();
+    void sharedDtor();
+    void setCachedSize(int size) const;
+    bool getOrDelete(uint8_t command, const butil::StringPiece& key,
+                     uint8_t coll_id = 0);
+    bool counter(uint8_t command, const butil::StringPiece& key, uint64_t delta,
+                 uint64_t initial_value, uint32_t exptime);
+
+    bool store(uint8_t command, const butil::StringPiece& key,
+               const butil::StringPiece& value, uint32_t flags,
+               uint32_t exptime, uint64_t cas_value, uint8_t coll_id = 0);
+    uint32_t hashCrc32(const char* key, size_t key_length);
+
+   public:
+    std::unordered_map<std::string /*bucket*/,
+                  CouchbaseManifestManager::CollectionManifest>*
+        local_collection_manifest_cache;
+
+    CouchbaseRequest(
+        std::unordered_map<std::string /*bucket*/,
+                      CouchbaseManifestManager::CollectionManifest>*
+            local_cache_reference)
+        : NonreflectableMessage<CouchbaseRequest>() {
+      metadata_tracking = &common_metadata_tracking;
+      local_collection_manifest_cache = local_cache_reference;
+      sharedCtor();
+    }
+    CouchbaseRequest() : NonreflectableMessage<CouchbaseRequest>() {
+      metadata_tracking = &common_metadata_tracking;
+      sharedCtor();
+    }
+    ~CouchbaseRequest() { sharedDtor(); }
+    CouchbaseRequest(const CouchbaseRequest& from)
+        : NonreflectableMessage<CouchbaseRequest>() {
+      metadata_tracking = &common_metadata_tracking;
+      sharedCtor();
+      MergeFrom(from);
+    }
+
+    inline CouchbaseRequest& operator=(const CouchbaseRequest& from) {
+      if (this != &from) {
+        MergeFrom(from);
+      }
+      return *this;
+    }
+
+    bool selectBucketRequest(const butil::StringPiece& bucket_name);
+    bool authenticateRequest(const butil::StringPiece& username,
+                             const butil::StringPiece& password);
+    bool helloRequest();
+
+    // Using GetCollectionManifest instead of fetching collection ID directly
+    // bool GetCollectionId(const butil::StringPiece& scope_name,
+    //                      const butil::StringPiece& collection_name);
+
+    bool getScopeId(const butil::StringPiece& scope_name);
+
+    bool getCollectionManifest();
+      
+    bool getLocalCachedCollectionId(const std::string& bucket, const std::string& scope,
+                                    const std::string& collection, uint8_t* coll_id);
+
+    bool getCachedOrFetchCollectionId(
+        std::string collection_name, uint8_t* coll_id,
+        brpc::CouchbaseManifestManager* metadata_tracking,
+        brpc::Channel* channel, const std::string& server,
+        const std::string& selected_bucket,
+        std::unordered_map<std::string, CouchbaseManifestManager::CollectionManifest>*
+            local_cache);
+
+    // Collection-aware document operations
+    bool getRequest(const butil::StringPiece& key,
+                    std::string collection_name = "_default",
+                    brpc::Channel* channel = nullptr, const std::string& server = "",
+                    const std::string& bucket = "");
+
+    bool upsertRequest(const butil::StringPiece& key,
+                       const butil::StringPiece& value, uint32_t flags,
+                       uint32_t exptime, uint64_t cas_value,
+                       std::string collection_name = "_default",
+                       brpc::Channel* channel = nullptr,
+                       const std::string& server = "", const std::string& bucket = "");
+
+    bool addRequest(const butil::StringPiece& key,
+                    const butil::StringPiece& value, uint32_t flags,
+                    uint32_t exptime, uint64_t cas_value,
+                    std::string collection_name = "_default",
+                    brpc::Channel* channel = nullptr, const std::string& server = "",
+                    const std::string& bucket = "");
+
+    bool appendRequest(const butil::StringPiece& key,
+                       const butil::StringPiece& value, uint32_t flags,
+                       uint32_t exptime, uint64_t cas_value,
+                       std::string collection_name = "_default",
+                       brpc::Channel* channel = nullptr,
+                       const std::string& server = "", const std::string& bucket = "");
+
+    bool prependRequest(const butil::StringPiece& key,
+                        const butil::StringPiece& value, uint32_t flags,
+                        uint32_t exptime, uint64_t cas_value,
+                        std::string collection_name = "_default",
+                        brpc::Channel* channel = nullptr,
+                        const std::string& server = "", const std::string& bucket = "");
+
+    bool deleteRequest(const butil::StringPiece& key,
+                       std::string collection_name = "_default",
+                       brpc::Channel* channel = nullptr,
+                       const std::string& server = "", const std::string& bucket = "");
+
+    bool versionRequest();
+
+    int pipelinedCount() const { return _pipelined_count; }
+
+    butil::IOBuf& rawBuffer() { return _buf; }
+    const butil::IOBuf& rawBuffer() const {
+      return _buf;
+    }  // used in couchbase_protocol serialization.
+    void Swap(CouchbaseRequest* other);
+    void MergeFrom(const CouchbaseRequest& from) override;
+    void Clear() override;
+    bool IsInitialized() const PB_527_OVERRIDE;
+  };
+
+  class CouchbaseResponse : public NonreflectableMessage<CouchbaseResponse> {
+   public:
+    static brpc::CouchbaseManifestManager* metadata_tracking;
+
+   private:
+    std::string _err;
+    butil::IOBuf _buf;
+    mutable int _cached_size_;
+    bool popCounter(uint8_t command, uint64_t* new_value, uint64_t* cas_value);
+    bool popStore(uint8_t command, uint64_t* cas_value);
+
+    void sharedCtor();
+    void sharedDtor();
+    void setCachedSize(int size) const;
+
+   public:
+    uint16_t _status_code;
+
+    CouchbaseResponse() : NonreflectableMessage<CouchbaseResponse>() {
+      sharedCtor();
+    }
+    ~CouchbaseResponse() { sharedDtor(); }
+    CouchbaseResponse(const CouchbaseResponse& from)
+        : NonreflectableMessage<CouchbaseResponse>() {
+      metadata_tracking = &common_metadata_tracking;
+      sharedCtor();
+      MergeFrom(from);
+    }
+    inline CouchbaseResponse& operator=(const CouchbaseResponse& from) {
+      if (this != &from) {
+        MergeFrom(from);
+      }
+      return *this;
+    }
+
+    // the status codes are from Couchbase Binary Protocol documentation,
+    // for original reference of status codes visit
+    // https://github.com/couchbase/kv_engine/blob/master/include/mcbp/protocol/status.h
+    enum Status {
+      STATUS_SUCCESS = 0x00,
+      STATUS_KEY_ENOENT = 0x01,
+      STATUS_KEY_EEXISTS = 0x02,
+      STATUS_E2BIG = 0x03,
+      STATUS_EINVAL = 0x04,
+      STATUS_NOT_STORED = 0x05,
+      STATUS_DELTA_BADVAL = 0x06,
+      STATUS_VBUCKET_BELONGS_TO_ANOTHER_SERVER = 0x07,
+      STATUS_AUTH_ERROR = 0x20,
+      STATUS_AUTH_CONTINUE = 0x21,
+      STATUS_ERANGE = 0x22,
+      STATUS_ROLLBACK = 0x23,
+      STATUS_EACCESS = 0x24,
+      STATUS_NOT_INITIALIZED = 0x25,
+      STATUS_UNKNOWN_COMMAND = 0x81,
+      STATUS_ENOMEM = 0x82,
+      STATUS_NOT_SUPPORTED = 0x83,
+      STATUS_EINTERNAL = 0x84,
+      STATUS_EBUSY = 0x85,
+      STATUS_ETMPFAIL = 0x86,
+      STATUS_UNKNOWN_COLLECTION = 0x88,
+      STATUS_NO_COLLECTIONS_MANIFEST = 0x89,
+      STATUS_CANNOT_APPLY_COLLECTIONS_MANIFEST = 0x8a,
+      STATUS_COLLECTIONS_MANIFEST_IS_AHEAD = 0x8b,
+      STATUS_UNKNOWN_SCOPE = 0x8c,
+      STATUS_DCP_STREAM_ID_INVALID = 0x8d,
+      STATUS_DURABILITY_INVALID_LEVEL = 0xa0,
+      STATUS_DURABILITY_IMPOSSIBLE = 0xa1,
+      STATUS_SYNC_WRITE_IN_PROGRESS = 0xa2,
+      STATUS_SYNC_WRITE_AMBIGUOUS = 0xa3,
+      STATUS_SYNC_WRITE_RE_COMMIT_IN_PROGRESS = 0xa4,
+      STATUS_SUBDOC_PATH_NOT_FOUND = 0xc0,
+      STATUS_SUBDOC_PATH_MISMATCH = 0xc1,
+      STATUS_SUBDOC_PATH_EINVAL = 0xc2,
+      STATUS_SUBDOC_PATH_E2BIG = 0xc3,
+      STATUS_SUBDOC_DOC_E2DEEP = 0xc4,
+      STATUS_SUBDOC_VALUE_CANTINSERT = 0xc5,
+      STATUS_SUBDOC_DOC_NOT_JSON = 0xc6,
+      STATUS_SUBDOC_NUM_E2BIG = 0xc7,
+      STATUS_SUBDOC_DELTA_E2BIG = 0xc8,
+      STATUS_SUBDOC_PATH_EEXISTS = 0xc9,
+      STATUS_SUBDOC_VALUE_E2DEEP = 0xca,
+      STATUS_SUBDOC_INVALID_COMBO = 0xcb,
+      STATUS_SUBDOC_MULTI_PATH_FAILURE = 0xcc,
+      STATUS_SUBDOC_SUCCESS_DELETED = 0xcd,
+      STATUS_SUBDOC_XATTR_INVALID_FLAG_COMBO = 0xce,
+      STATUS_SUBDOC_XATTR_INVALID_KEY_COMBO = 0xcf,
+      STATUS_SUBDOC_XATTR_UNKNOWN_MACRO = 0xd0,
+      STATUS_SUBDOC_XATTR_UNKNOWN_VATTR = 0xd1,
+      STATUS_SUBDOC_XATTR_CANT_MODIFY_VATTR = 0xd2,
+      STATUS_SUBDOC_MULTI_PATH_FAILURE_DELETED = 0xd3,
+      STATUS_SUBDOC_INVALID_XATTR_ORDER = 0xd4,
+      STATUS_SUBDOC_XATTR_UNKNOWN_VATTR_MACRO = 0xd5,
+      STATUS_SUBDOC_CAN_ONLY_REVIVE_DELETED_DOCUMENTS = 0xd6,
+      STATUS_SUBDOC_DELETED_DOCUMENT_CANT_HAVE_VALUE = 0xd7,
+      STATUS_XATTR_EINVAL = 0xe0
+    };
+    const char* couchbaseBinaryCommandToString(uint8_t cmd);
+    void MergeFrom(const CouchbaseResponse& from) override;
+    void Clear() override;
+    bool IsInitialized() const PB_527_OVERRIDE;
+
+    butil::IOBuf& rawBuffer() { return _buf; }
+    static const char* statusStr(Status);
+
+    // Helper method to format error messages with status codes
+    static std::string formatErrorMessage(uint16_t status_code,
+                                     const std::string& operation,
+                                     const std::string& error_msg = "");
+
+    // Add methods to handle response parsing
+    void swap(CouchbaseResponse* other);
+    bool popGet(butil::IOBuf* value, uint32_t* flags, uint64_t* cas_value);
+    bool popGet(std::string* value, uint32_t* flags, uint64_t* cas_value);
+    const std::string& lastError() const { return _err; }
+    bool popUpsert(uint64_t* cas_value);
+    bool popAdd(uint64_t* cas_value);
+    // Warning: Not tested
+    // bool popReplace(uint64_t* cas_value);
+    bool popAppend(uint64_t* cas_value);
+    bool popPrepend(uint64_t* cas_value);
+    bool popSelectBucket(uint64_t* cas_value);
+    bool popAuthenticate(uint64_t* cas_value);
+    bool popHello(uint64_t* cas_value);
+
+    // Collection-related response methods
+    bool popCollectionId(uint8_t* collection_id);
+
+    bool popManifest(std::string* manifest_json);
+
+    bool popDelete();
+    // Warning: Not tested
+    // bool popFlush();
+    // bool popIncrement(uint64_t* new_value, uint64_t* cas_value);
+    // bool popDecrement(uint64_t* new_value, uint64_t* cas_value);
+    // bool popTouch();
+    bool popVersion(std::string* version);
+  };
+
+  friend bool sendRequest(CouchbaseOperations::operation_type op_type,
+                          const std::string& key, const std::string& value,
+                          std::string collection_name,
+                          CouchbaseOperations::Result* result,
+                          brpc::Channel* channel, const std::string& server,
+                          const std::string& bucket, CouchbaseRequest* request,
+                          CouchbaseResponse* response);
+
+  // Pipeline management - per instance
+  std::queue<operation_type> pipeline_operations_queue;
+  CouchbaseRequest pipeline_request_couchbase_req;
+  CouchbaseResponse pipeline_response_couchbase_resp;
+  bool pipeline_active;
+};
+
+}  // namespace brpc
\ No newline at end of file
diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp
index 0196b6d..82ec32e 100644
--- a/src/brpc/global.cpp
+++ b/src/brpc/global.cpp
@@ -75,6 +75,7 @@
 #include "brpc/policy/ubrpc2pb_protocol.h"
 #include "brpc/policy/sofa_pbrpc_protocol.h"
 #include "brpc/policy/memcache_binary_protocol.h"
+#include "brpc/policy/couchbase_protocol.h"
 #include "brpc/policy/streaming_rpc_protocol.h"
 #include "brpc/policy/mongo_protocol.h"
 #include "brpc/policy/redis_protocol.h"
@@ -518,6 +519,16 @@
         exit(1);
     }
 
+    Protocol couchbase_protocol = { ParseCouchbaseMessage,
+                                    SerializeCouchbaseRequest,
+                                    PackCouchbaseRequest,
+                                    NULL, ProcessCouchbaseResponse,
+                                    NULL, NULL, GetCouchbaseMethodName,
+                                    CONNECTION_TYPE_ALL, "couchbase" };
+    if (RegisterProtocol(PROTOCOL_COUCHBASE, couchbase_protocol) != 0) {
+        exit(1);
+    }
+
     Protocol redis_protocol = { ParseRedisMessage,
                                 SerializeRedisRequest,
                                 PackRedisRequest,
diff --git a/src/brpc/options.proto b/src/brpc/options.proto
index 34001d7..4ad97aa 100644
--- a/src/brpc/options.proto
+++ b/src/brpc/options.proto
@@ -64,6 +64,7 @@
     PROTOCOL_CDS_AGENT = 24;           // Client side only
     PROTOCOL_ESP = 25;                 // Client side only
     PROTOCOL_H2 = 26;
+    PROTOCOL_COUCHBASE = 27;
 }
 
 enum CompressType {
diff --git a/src/brpc/policy/couchbase_protocol.cpp b/src/brpc/policy/couchbase_protocol.cpp
new file mode 100644
index 0000000..a014581
--- /dev/null
+++ b/src/brpc/policy/couchbase_protocol.cpp
@@ -0,0 +1,236 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/policy/couchbase_protocol.h"
+
+#include <gflags/gflags.h>
+#include <google/protobuf/descriptor.h>  // MethodDescriptor
+#include <google/protobuf/message.h>     // Message
+
+#include "brpc/compress.h"    // ParseFromCompressedData
+#include "brpc/controller.h"  // Controller
+#include "brpc/couchbase.h"   // CouchbaseRequest, CouchbaseResponse
+#include "brpc/details/controller_private_accessor.h"
+#include "brpc/details/server_private_accessor.h"
+#include "brpc/policy/most_common_message.h"
+#include "brpc/server.h"  // Server
+#include "brpc/socket.h"  // Socket
+#include "brpc/span.h"
+#include "butil/containers/flat_map.h"
+#include "butil/iobuf.h"    // butil::IOBuf
+#include "butil/logging.h"  // LOG()
+#include "butil/sys_byteorder.h"
+#include "butil/time.h"
+
+namespace brpc {
+
+DECLARE_bool(enable_rpcz);
+
+namespace policy {
+
+BAIDU_CASSERT(sizeof(CouchbaseRequestHeader) == 24, must_match);
+BAIDU_CASSERT(sizeof(CouchbaseResponseHeader) == 24, must_match);
+
+static uint64_t supported_cmd_map[8];
+static pthread_once_t supported_cmd_map_once = PTHREAD_ONCE_INIT;
+
+static void InitSupportedCommandMap() {
+  butil::bit_array_clear(supported_cmd_map, 256);
+  butil::bit_array_set(supported_cmd_map, CB_BINARY_GET);
+  butil::bit_array_set(supported_cmd_map, CB_HELLO_SELECT_FEATURES);
+  butil::bit_array_set(supported_cmd_map, CB_SELECT_BUCKET);
+  butil::bit_array_set(supported_cmd_map, CB_GET_SCOPE_ID);
+  butil::bit_array_set(supported_cmd_map, CB_BINARY_SET);
+  butil::bit_array_set(supported_cmd_map, CB_BINARY_ADD);
+  butil::bit_array_set(supported_cmd_map, CB_BINARY_REPLACE);
+  butil::bit_array_set(supported_cmd_map, CB_BINARY_DELETE);
+  butil::bit_array_set(supported_cmd_map, CB_BINARY_INCREMENT);
+  butil::bit_array_set(supported_cmd_map, CB_BINARY_DECREMENT);
+  butil::bit_array_set(supported_cmd_map, CB_BINARY_FLUSH);
+  butil::bit_array_set(supported_cmd_map, CB_BINARY_VERSION);
+  butil::bit_array_set(supported_cmd_map, CB_BINARY_NOOP);
+  butil::bit_array_set(supported_cmd_map, CB_BINARY_APPEND);
+  butil::bit_array_set(supported_cmd_map, CB_BINARY_PREPEND);
+  butil::bit_array_set(supported_cmd_map, CB_BINARY_STAT);
+  butil::bit_array_set(supported_cmd_map, CB_BINARY_TOUCH);
+  butil::bit_array_set(supported_cmd_map, CB_BINARY_SASL_AUTH);
+  // Collection management commands
+  butil::bit_array_set(supported_cmd_map, CB_GET_COLLECTIONS_MANIFEST);
+  butil::bit_array_set(supported_cmd_map, CB_COLLECTIONS_GET_CID);
+  butil::bit_array_set(supported_cmd_map, CB_COLLECTIONS_GET_SCOPE_ID);
+}
+
+inline bool IsSupportedCommand(uint8_t command) {
+  pthread_once(&supported_cmd_map_once, InitSupportedCommandMap);
+  return butil::bit_array_get(supported_cmd_map, command);
+}
+
+ParseResult ParseCouchbaseMessage(butil::IOBuf* source, Socket* socket,
+                                  bool /*read_eof*/, const void* /*arg*/) {
+  while (1) {
+    const uint8_t* p_cbmagic = (const uint8_t*)source->fetch1();
+    if (NULL == p_cbmagic) {
+      return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+    }
+    if (*p_cbmagic != (uint8_t)CB_MAGIC_RESPONSE) {
+      return MakeParseError(PARSE_ERROR_TRY_OTHERS);
+    }
+    char buf[24];
+    const uint8_t* p = (const uint8_t*)source->fetch(buf, sizeof(buf));
+    if (NULL == p) {
+      return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+    }
+    const CouchbaseResponseHeader* header = (const CouchbaseResponseHeader*)p;
+    uint32_t total_body_length = butil::NetToHost32(header->total_body_length);
+    if (source->size() < sizeof(*header) + total_body_length) {
+      return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+    }
+
+    if (!IsSupportedCommand(header->command)) {
+      LOG(WARNING) << "Not support command=" << header->command;
+      source->pop_front(sizeof(*header) + total_body_length);
+      return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+    }
+
+    PipelinedInfo pi;
+    if (!socket->PopPipelinedInfo(&pi)) {
+      LOG(WARNING) << "No corresponding PipelinedInfo in socket, drop";
+      source->pop_front(sizeof(*header) + total_body_length);
+      return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+    }
+    MostCommonMessage* msg =
+        static_cast<MostCommonMessage*>(socket->parsing_context());
+    if (msg == NULL) {
+      msg = MostCommonMessage::Get();
+      socket->reset_parsing_context(msg);
+    }
+
+    // endianness conversions.
+    const CouchbaseResponseHeader local_header = {
+        header->magic,
+        header->command,
+        butil::NetToHost16(header->key_length),
+        header->extras_length,
+        header->data_type,
+        butil::NetToHost16(header->status),
+        total_body_length,
+        butil::NetToHost32(header->opaque),
+        butil::NetToHost64(header->cas_value),
+    };
+    msg->meta.append(&local_header, sizeof(local_header));
+    source->pop_front(sizeof(*header));
+    source->cutn(&msg->meta, total_body_length);
+    if (++msg->pi.count >= pi.count) {
+      CHECK_EQ(msg->pi.count, pi.count);
+      msg = static_cast<MostCommonMessage*>(socket->release_parsing_context());
+      msg->pi = pi;
+      return MakeMessage(msg);
+    } else {
+      socket->GivebackPipelinedInfo(pi);
+    }
+  }
+}
+
+void ProcessCouchbaseResponse(InputMessageBase* msg_base) {
+  const int64_t start_parse_us = butil::cpuwide_time_us();
+  DestroyingPtr<MostCommonMessage> msg(
+      static_cast<MostCommonMessage*>(msg_base));
+
+  const bthread_id_t cid = msg->pi.id_wait;
+  Controller* cntl = NULL;
+  const int rc = bthread_id_lock(cid, (void**)&cntl);
+  if (rc != 0) {
+    LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
+        << "Fail to lock correlation_id=" << cid << ": " << berror(rc);
+    return;
+  }
+
+  ControllerPrivateAccessor accessor(cntl);
+  Span* span = accessor.span();
+  if (span) {
+    span->set_base_real_us(msg->base_real_us());
+    span->set_received_us(msg->received_us());
+    span->set_response_size(msg->meta.length());
+    span->set_start_parse_us(start_parse_us);
+  }
+  const int saved_error = cntl->ErrorCode();
+  if (cntl->response() == NULL) {
+    cntl->SetFailed(ERESPONSE, "response is NULL!");
+  } else if (cntl->response()->GetDescriptor() !=
+             CouchbaseOperations::CouchbaseResponse::descriptor()) {
+    cntl->SetFailed(ERESPONSE, "Must be CouchbaseResponse");
+  } else {
+    // We work around ParseFrom of pb which is just a placeholder.
+    ((CouchbaseOperations::CouchbaseResponse*)cntl->response())->rawBuffer() =
+        msg->meta.movable();
+    if (msg->pi.count != accessor.pipelined_count()) {
+      cntl->SetFailed(ERESPONSE,
+                      "pipelined_count=%d of response does "
+                      "not equal request's=%d",
+                      msg->pi.count, accessor.pipelined_count());
+    }
+  }
+  // Unlocks correlation_id inside. Revert controller's
+  // error code if it version check of `cid' fails
+  msg.reset();  // optional, just release resource ASAP
+  accessor.OnResponse(cid, saved_error);
+}
+
+void SerializeCouchbaseRequest(butil::IOBuf* buf, Controller* cntl,
+                               const google::protobuf::Message* request) {
+  if (request == NULL) {
+    return cntl->SetFailed(EREQUEST, "request is NULL");
+  }
+  if (request->GetDescriptor() !=
+      CouchbaseOperations::CouchbaseRequest::descriptor()) {
+    return cntl->SetFailed(EREQUEST, "Must be CouchbaseRequest");
+  }
+  const CouchbaseOperations::CouchbaseRequest* mr =
+      (const CouchbaseOperations::CouchbaseRequest*)request;
+  // We work around SerializeTo of pb which is just a placeholder.
+  *buf = mr->rawBuffer();
+  ControllerPrivateAccessor(cntl).set_pipelined_count(mr->pipelinedCount());
+}
+
+void PackCouchbaseRequest(butil::IOBuf* buf, SocketMessage**,
+                          uint64_t /*correlation_id*/,
+                          const google::protobuf::MethodDescriptor*,
+                          Controller* cntl, const butil::IOBuf& request,
+                          const Authenticator* auth) {
+  if (auth) {
+    std::string auth_str;
+    if (auth->GenerateCredential(&auth_str) != 0) {
+      return cntl->SetFailed(EREQUEST, "Fail to generate credential");
+    }
+    if (auth_str.empty()) {
+      return cntl->SetFailed(EREQUEST, "Empty auth_str");
+    }
+    buf->append(auth_str);
+    // pipelined_count();
+  } else {
+    buf->append(request);
+  }
+}
+
+const std::string& GetCouchbaseMethodName(
+    const google::protobuf::MethodDescriptor*, const Controller*) {
+  const static std::string CouchbaseD_STR = "Couchbase";
+  return CouchbaseD_STR;
+}
+
+}  // namespace policy
+}  // namespace brpc
diff --git a/src/brpc/policy/couchbase_protocol.h b/src/brpc/policy/couchbase_protocol.h
new file mode 100644
index 0000000..15367de
--- /dev/null
+++ b/src/brpc/policy/couchbase_protocol.h
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_POLICY_COUCHBASE_BINARY_PROTOCOL_H
+#define BRPC_POLICY_COUCHBASE_BINARY_PROTOCOL_H
+
+#include "brpc/protocol.h"
+
+namespace brpc {
+namespace policy {
+
+enum CouchbaseMagic { CB_MAGIC_REQUEST = 0x80, CB_MAGIC_RESPONSE = 0x81 };
+
+// Definition of the data types in the packet
+// https://github.com/couchbase/kv_engine/blob/master/docs/BinaryProtocol.md
+enum CouchbaseBinaryDataType { CB_BINARY_RAW_BYTES = 0x00 };
+
+enum CouchbaseJsonDataType { CB_JSON = 0x01 };
+
+// Definition of the different command opcodes.
+// https://github.com/couchbase/kv_engine/blob/master/docs/BinaryProtocol.md
+enum CouchbaseBinaryCommand {
+  CB_HELLO_SELECT_FEATURES = 0x1f,
+  CB_SELECT_BUCKET = 0x89,
+  CB_GET_SCOPE_ID = 0xBC,
+  CB_BINARY_GET = 0x00,
+  CB_BINARY_SET = 0x01,
+  CB_BINARY_ADD = 0x02,
+  CB_BINARY_REPLACE = 0x03,
+  CB_BINARY_DELETE = 0x04,
+  CB_BINARY_INCREMENT = 0x05,
+  CB_BINARY_DECREMENT = 0x06,
+  CB_BINARY_QUIT = 0x07,
+  CB_BINARY_FLUSH = 0x08,
+  CB_BINARY_GETQ = 0x09,
+  CB_BINARY_NOOP = 0x0a,
+  CB_BINARY_VERSION = 0x0b,
+  CB_BINARY_GETK = 0x0c,
+  CB_BINARY_GETKQ = 0x0d,
+  CB_BINARY_APPEND = 0x0e,
+  CB_BINARY_PREPEND = 0x0f,
+  CB_BINARY_STAT = 0x10,
+  CB_BINARY_SETQ = 0x11,
+  CB_BINARY_ADDQ = 0x12,
+  CB_BINARY_REPLACEQ = 0x13,
+  CB_BINARY_DELETEQ = 0x14,
+  CB_BINARY_INCREMENTQ = 0x15,
+  CB_BINARY_DECREMENTQ = 0x16,
+  CB_BINARY_QUITQ = 0x17,
+  CB_BINARY_FLUSHQ = 0x18,
+  CB_BINARY_APPENDQ = 0x19,
+  CB_BINARY_PREPENDQ = 0x1a,
+  CB_BINARY_TOUCH = 0x1c,
+  CB_BINARY_GAT = 0x1d,
+  CB_BINARY_GATQ = 0x1e,
+  CB_BINARY_GATK = 0x23,
+  CB_BINARY_GATKQ = 0x24,
+
+  CB_BINARY_SASL_LIST_MECHS = 0x20,
+  CB_BINARY_SASL_AUTH = 0x21,
+  CB_BINARY_SASL_STEP = 0x22,
+
+  // Collection Management Commands (Couchbase 7.0+)
+  CB_GET_CLUSTER_CONFIG = 0xb5,
+  CB_GET_COLLECTIONS_MANIFEST = 0xba,
+  CB_COLLECTIONS_GET_CID = 0xbb,
+  CB_COLLECTIONS_GET_SCOPE_ID = 0xbc,
+
+};
+
+struct CouchbaseRequestHeader {
+  // Magic number identifying the package (See Couchbase Binary
+  // Protocol#Magic_Byte)
+  uint8_t magic;
+
+  // Command code (See Couchbase Binary Protocol#Command_opcodes)
+  uint8_t command;
+
+  // Length in bytes of the text key that follows the command extras
+  uint16_t key_length;
+
+  // Length in bytes of the command extras
+  uint8_t extras_length;
+
+  // Reserved for future use (See Couchbase Binary Protocol#Data_Type)
+  uint8_t data_type;
+
+  // The virtual bucket for this command
+  uint16_t vbucket_id;
+
+  // Length in bytes of extra + key + value
+  uint32_t total_body_length;
+
+  // Will be copied back to you in the response
+  uint32_t opaque;
+
+  // Data version check
+  uint64_t cas_value;
+};
+
+struct CouchbaseResponseHeader {
+  // Magic number identifying the package (See Couchbase Binary
+  // Protocol#Magic_Byte)
+  uint8_t magic;
+
+  // Command code (See Couchbase Binary Protocol#Command_opcodes)
+  uint8_t command;
+
+  // Length in bytes of the text key that follows the command extras
+  uint16_t key_length;
+
+  // Length in bytes of the command extras
+  uint8_t extras_length;
+
+  // Reserved for future use (See Couchbase Binary Protocol#Data_Type)
+  uint8_t data_type;
+
+  // Status of the response (non-zero on error)
+  uint16_t status;
+
+  // Length in bytes of extra + key + value
+  uint32_t total_body_length;
+
+  // Will be copied back to you in the response
+  uint32_t opaque;
+
+  // Data version check
+  uint64_t cas_value;
+};
+
+// Parse couchbase messages.
+ParseResult ParseCouchbaseMessage(butil::IOBuf* source, Socket* socket,
+                                  bool read_eof, const void* arg);
+
+// Actions to a couchbase response.
+void ProcessCouchbaseResponse(InputMessageBase* msg);
+
+// Serialize a couchbase request.
+void SerializeCouchbaseRequest(butil::IOBuf* buf, Controller* cntl,
+                               const google::protobuf::Message* request);
+
+// Pack `request' to `method' into `buf'.
+void PackCouchbaseRequest(butil::IOBuf* buf, SocketMessage**,
+                          uint64_t correlation_id,
+                          const google::protobuf::MethodDescriptor* method,
+                          Controller* controller, const butil::IOBuf& request,
+                          const Authenticator* auth);
+
+// process couchbase request.
+// since, there is no server side instance running, this function is not
+// implemented. void ProcessCouchbaseRequest(InputMessageBase* msg);
+
+const std::string& GetCouchbaseMethodName(
+    const google::protobuf::MethodDescriptor*, const Controller*);
+
+}  // namespace policy
+}  // namespace brpc
+
+#endif  // BRPC_POLICY_COUCHBASE_BINARY_PROTOCOL_H
diff --git a/src/brpc/proto_base.proto b/src/brpc/proto_base.proto
index b278ddb..3fcdda0 100644
--- a/src/brpc/proto_base.proto
+++ b/src/brpc/proto_base.proto
@@ -25,6 +25,9 @@
 
 message EspMessageBase {}
 
+message CouchbaseRequestBase {}
+message CouchbaseResponseBase {}
+
 message MemcacheRequestBase {}
 message MemcacheResponseBase {}
 
diff --git a/test/brpc_couchbase_unittest.cpp b/test/brpc_couchbase_unittest.cpp
new file mode 100644
index 0000000..dacc9a0
--- /dev/null
+++ b/test/brpc_couchbase_unittest.cpp
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <brpc/couchbase.h>
+#include <butil/logging.h>
+#include <gtest/gtest.h>
+
+namespace brpc {
+DECLARE_int32(idle_timeout_second);
+}
+
+int main(int argc, char* argv[]) {
+  brpc::FLAGS_idle_timeout_second = 0;
+  testing::InitGoogleTest(&argc, argv);
+  return RUN_ALL_TESTS();
+}
+
+namespace {
+
+// Unit Tests - No Server Required
+class CouchbaseUnitTest : public testing::Test {};
+
+TEST_F(CouchbaseUnitTest, RequestBuilders) {
+  brpc::CouchbaseOperations::CouchbaseRequest req;
+  req.Clear();
+  req.helloRequest();
+  req.authenticateRequest("user", "pass");
+  req.selectBucketRequest("bucket");
+  req.addRequest("key", "value", 0, 0, 0);
+  req.getRequest("key");
+  req.upsertRequest("key", "value", 0, 0, 0);
+  req.deleteRequest("key");
+  EXPECT_TRUE(true);
+}
+
+TEST_F(CouchbaseUnitTest, ResultStruct) {
+  brpc::CouchbaseOperations::Result result;
+  
+  result.success = true;
+  result.error_message = "Test";
+  result.value = R"({"test": "data"})";
+  result.status_code = 0x01;
+  
+  EXPECT_TRUE(result.success);
+  EXPECT_EQ("Test", result.error_message);
+  EXPECT_EQ(R"({"test": "data"})", result.value);
+  EXPECT_EQ(0x01, result.status_code);
+  
+  result.success = false;
+  result.error_message = "";
+  result.value = "";
+  result.status_code = 0x00;
+  
+  EXPECT_FALSE(result.success);
+  EXPECT_TRUE(result.error_message.empty());
+  EXPECT_TRUE(result.value.empty());
+  EXPECT_EQ(0x00, result.status_code);
+}
+
+TEST_F(CouchbaseUnitTest, EdgeCases) {
+  brpc::CouchbaseOperations::CouchbaseRequest req;
+  req.addRequest("", "value", 0, 0, 0);
+  req.addRequest("key", "", 0, 0, 0);
+  req.addRequest(std::string(1000, 'x'), "val", 0, 0, 0);
+  req.addRequest("key", std::string(10000, 'x'), 0, 0, 0);
+  req.addRequest("test::special::!!!","val", 0, 0, 0);
+  req.addRequest("key", R"({"unicode":"123"})", 0, 0, 0);
+  EXPECT_TRUE(true);
+}
+
+}  // namespace
\ No newline at end of file