[feat] Support end-to-end encryption in C Reader API (#262)

(cherry picked from commit 043c88b4d62691b450c6302b1f718b56a4388ce0)
diff --git a/include/pulsar/c/reader_configuration.h b/include/pulsar/c/reader_configuration.h
index 66ce8ef..a409226 100644
--- a/include/pulsar/c/reader_configuration.h
+++ b/include/pulsar/c/reader_configuration.h
@@ -23,6 +23,8 @@
 #include <pulsar/c/reader.h>
 #include <pulsar/defines.h>
 
+#include "consumer_configuration.h"
+
 #ifdef __cplusplus
 extern "C" {
 #endif
@@ -89,6 +91,16 @@
 
 PULSAR_PUBLIC int pulsar_reader_configuration_is_read_compacted(pulsar_reader_configuration_t *configuration);
 
+PULSAR_PUBLIC void pulsar_reader_configuration_set_default_crypto_key_reader(
+    pulsar_reader_configuration_t *configuration, const char *public_key_path, const char *private_key_path);
+
+PULSAR_PUBLIC pulsar_consumer_crypto_failure_action
+pulsar_reader_configuration_get_crypto_failure_action(pulsar_reader_configuration_t *configuration);
+
+PULSAR_PUBLIC void pulsar_reader_configuration_set_crypto_failure_action(
+    pulsar_reader_configuration_t *configuration,
+    pulsar_consumer_crypto_failure_action crypto_failure_action);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/c/c_ReaderConfiguration.cc b/lib/c/c_ReaderConfiguration.cc
index fe05cf2..b7aa9a9 100644
--- a/lib/c/c_ReaderConfiguration.cc
+++ b/lib/c/c_ReaderConfiguration.cc
@@ -86,3 +86,22 @@
 int pulsar_reader_configuration_is_read_compacted(pulsar_reader_configuration_t *configuration) {
     return configuration->conf.isReadCompacted();
 }
+
+void pulsar_reader_configuration_set_default_crypto_key_reader(pulsar_reader_configuration_t *configuration,
+                                                               const char *public_key_path,
+                                                               const char *private_key_path) {
+    std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
+        std::make_shared<pulsar::DefaultCryptoKeyReader>(public_key_path, private_key_path);
+    configuration->conf.setCryptoKeyReader(keyReader);
+}
+
+pulsar_consumer_crypto_failure_action pulsar_reader_configuration_get_crypto_failure_action(
+    pulsar_reader_configuration_t *configuration) {
+    return (pulsar_consumer_crypto_failure_action)configuration->conf.getCryptoFailureAction();
+}
+
+void pulsar_reader_configuration_set_crypto_failure_action(
+    pulsar_reader_configuration_t *configuration,
+    pulsar_consumer_crypto_failure_action crypto_failure_action) {
+    configuration->conf.setCryptoFailureAction((pulsar::ConsumerCryptoFailureAction)crypto_failure_action);
+}
diff --git a/tests/c/c_ReaderConfigurationTest.cc b/tests/c/c_ReaderConfigurationTest.cc
new file mode 100644
index 0000000..fd21646
--- /dev/null
+++ b/tests/c/c_ReaderConfigurationTest.cc
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/c/reader_configuration.h>
+
+#include <climits>
+
+TEST(C_ReaderConfigurationTest, testCApiConfig) {
+    pulsar_reader_configuration_t *reader_conf = pulsar_reader_configuration_create();
+
+    ASSERT_FALSE(pulsar_reader_configuration_has_reader_listener(reader_conf));
+
+    ASSERT_EQ(pulsar_reader_configuration_get_receiver_queue_size(reader_conf), 1000);
+    pulsar_reader_configuration_set_receiver_queue_size(reader_conf, 1729);
+    ASSERT_EQ(pulsar_reader_configuration_get_receiver_queue_size(reader_conf), 1729);
+
+    ASSERT_STREQ(pulsar_reader_configuration_get_subscription_role_prefix(reader_conf), "");
+    pulsar_reader_configuration_set_subscription_role_prefix(reader_conf, "prefix");
+    ASSERT_STREQ(pulsar_reader_configuration_get_subscription_role_prefix(reader_conf), "prefix");
+
+    ASSERT_STREQ(pulsar_reader_configuration_get_reader_name(reader_conf), "");
+    pulsar_reader_configuration_set_reader_name(reader_conf, "reader");
+    ASSERT_STREQ(pulsar_reader_configuration_get_reader_name(reader_conf), "reader");
+
+    ASSERT_FALSE(pulsar_reader_configuration_is_read_compacted(reader_conf));
+    pulsar_reader_configuration_set_read_compacted(reader_conf, true);
+    ASSERT_TRUE(pulsar_reader_configuration_is_read_compacted(reader_conf));
+
+    ASSERT_EQ(pulsar_reader_configuration_get_crypto_failure_action(reader_conf), pulsar_ConsumerFail);
+    pulsar_reader_configuration_set_crypto_failure_action(reader_conf, pulsar_ConsumerDiscard);
+    ASSERT_EQ(pulsar_reader_configuration_get_crypto_failure_action(reader_conf), pulsar_ConsumerDiscard);
+
+    pulsar_reader_configuration_free(reader_conf);
+}