/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#include "fory/python/pyfory.h"

#include <algorithm>
#include <cstring>
#include <exception>
#include <limits>
#include <memory>
#include <vector>

#include "fory/type/type.h"
#include "fory/util/stream.h"
#include "fory/util/string_util.h"

#if PY_VERSION_HEX < 0x030A0000
static inline PyObject *Fory_PyNewRefCompat(PyObject *obj) {
  Py_INCREF(obj);
  return obj;
}
#define FORY_PY_NEWREF(obj) Fory_PyNewRefCompat(obj)
#define FORY_PY_SET_CHECK_EXACT(obj) (Py_TYPE(obj) == &PySet_Type)
#else
#define FORY_PY_NEWREF(obj) Py_NewRef(obj)
#define FORY_PY_SET_CHECK_EXACT(obj) PySet_CheckExact(obj)
#endif

static PyObject **py_sequence_get_items(PyObject *collection) {
  if (PyList_CheckExact(collection)) {
    return ((PyListObject *)collection)->ob_item;
  } else if (PyTuple_CheckExact(collection)) {
    return ((PyTupleObject *)collection)->ob_item;
  }
  return nullptr;
}

namespace fory {

static std::string fetch_python_error_message() {
  PyObject *type = nullptr;
  PyObject *value = nullptr;
  PyObject *traceback = nullptr;
  PyErr_Fetch(&type, &value, &traceback);
  PyErr_NormalizeException(&type, &value, &traceback);
  std::string message = "python stream read failed";
  if (value != nullptr) {
    PyObject *value_str = PyObject_Str(value);
    if (value_str != nullptr) {
      const char *c_str = PyUnicode_AsUTF8(value_str);
      if (c_str != nullptr) {
        message = c_str;
      }
      Py_DECREF(value_str);
    } else {
      PyErr_Clear();
    }
  }
  Py_XDECREF(type);
  Py_XDECREF(value);
  Py_XDECREF(traceback);
  return message;
}

enum class PythonStreamReadMethod {
  ReadInto,
  RecvInto,
  RecvIntoUnderscore,
};

static const char *
python_stream_read_method_name(PythonStreamReadMethod method) {
  switch (method) {
  case PythonStreamReadMethod::ReadInto:
    return "readinto";
  case PythonStreamReadMethod::RecvInto:
    return "recvinto";
  case PythonStreamReadMethod::RecvIntoUnderscore:
    return "recv_into";
  }
  return "readinto";
}

static bool resolve_python_stream_read_method(PyObject *stream,
                                              PythonStreamReadMethod *method,
                                              std::string *error_message) {
  struct MethodCandidate {
    const char *name;
    PythonStreamReadMethod method;
  };
  constexpr MethodCandidate k_candidates[] = {
      {"readinto", PythonStreamReadMethod::ReadInto},
      {"recv_into", PythonStreamReadMethod::RecvIntoUnderscore},
      {"recvinto", PythonStreamReadMethod::RecvInto},
  };
  for (const auto &candidate : k_candidates) {
    const int has_method = PyObject_HasAttrString(stream, candidate.name);
    if (has_method < 0) {
      *error_message = fetch_python_error_message();
      return false;
    }
    if (has_method == 0) {
      continue;
    }
    PyObject *method_obj = PyObject_GetAttrString(stream, candidate.name);
    if (method_obj == nullptr) {
      *error_message = fetch_python_error_message();
      return false;
    }
    const bool is_callable = PyCallable_Check(method_obj) != 0;
    Py_DECREF(method_obj);
    if (is_callable) {
      *method = candidate.method;
      return true;
    }
  }
  *error_message = "stream object must provide readinto(buffer), "
                   "recv_into(buffer, size) or recvinto(buffer, size) method";
  return false;
}

static bool resolve_python_stream_write_method(PyObject *stream,
                                               std::string *error_message) {
  const int has_write = PyObject_HasAttrString(stream, "write");
  if (has_write < 0) {
    *error_message = fetch_python_error_message();
    return false;
  }
  if (has_write == 0) {
    *error_message = "stream object must provide write(data) method";
    return false;
  }
  PyObject *method_obj = PyObject_GetAttrString(stream, "write");
  if (method_obj == nullptr) {
    *error_message = fetch_python_error_message();
    return false;
  }
  const bool is_callable = PyCallable_Check(method_obj) != 0;
  Py_DECREF(method_obj);
  if (!is_callable) {
    *error_message = "stream.write must be callable";
    return false;
  }
  return true;
}

class PyOutputStream final : public OutputStream {
public:
  explicit PyOutputStream(PyObject *stream, uint32_t buffer_size = 4096)
      : OutputStream(buffer_size), stream_(stream) {
    FORY_CHECK(stream_ != nullptr) << "stream must not be null";
    Py_INCREF(stream_);
  }

  ~PyOutputStream() override {
    if (stream_ != nullptr) {
      PyGILState_STATE gil_state = PyGILState_Ensure();
      Py_DECREF(stream_);
      PyGILState_Release(gil_state);
      stream_ = nullptr;
    }
  }

protected:
  Result<void, Error> write_to_stream(const uint8_t *src,
                                      uint32_t length) override {
    if (length == 0) {
      return Result<void, Error>();
    }
    if (src == nullptr) {
      return Unexpected(Error::invalid("output source pointer is null"));
    }
    if (stream_ == nullptr) {
      return Unexpected(Error::io_error("output stream is null"));
    }
    PyGILState_STATE gil_state = PyGILState_Ensure();
    uint32_t total_written = 0;
    while (total_written < length) {
      const uint32_t remaining = length - total_written;
      // Contract: stream.write must consume bytes synchronously before return.
      // The memoryview below is a transient view over serializer-managed
      // storage and is not safe to retain after write(...) returns.
      PyObject *chunk = PyMemoryView_FromMemory(
          reinterpret_cast<char *>(
              const_cast<uint8_t *>(src + static_cast<size_t>(total_written))),
          static_cast<Py_ssize_t>(remaining), PyBUF_READ);
      if (chunk == nullptr) {
        const std::string message = fetch_python_error_message();
        PyGILState_Release(gil_state);
        return Unexpected(Error::io_error(message));
      }
      PyObject *written_obj = PyObject_CallMethod(stream_, "write", "O", chunk);
      Py_DECREF(chunk);
      if (written_obj == nullptr) {
        const std::string message = fetch_python_error_message();
        PyGILState_Release(gil_state);
        return Unexpected(Error::io_error(message));
      }
      if (written_obj == Py_None) {
        Py_DECREF(written_obj);
        total_written = length;
        break;
      }
      const long long wrote_value = PyLong_AsLongLong(written_obj);
      Py_DECREF(written_obj);
      if (wrote_value == -1 && PyErr_Occurred() != nullptr) {
        const std::string message = fetch_python_error_message();
        PyGILState_Release(gil_state);
        return Unexpected(Error::io_error(message));
      }
      if (wrote_value <= 0) {
        PyGILState_Release(gil_state);
        return Unexpected(
            Error::io_error("stream write returned non-positive bytes"));
      }
      const uint64_t wrote_u64 = static_cast<uint64_t>(wrote_value);
      if (wrote_u64 >= remaining) {
        total_written = length;
      } else {
        total_written += static_cast<uint32_t>(wrote_u64);
      }
    }
    PyGILState_Release(gil_state);
    return Result<void, Error>();
  }

  Result<void, Error> flush_stream() override {
    if (stream_ == nullptr) {
      return Unexpected(Error::io_error("output stream is null"));
    }
    PyGILState_STATE gil_state = PyGILState_Ensure();
    const int has_flush = PyObject_HasAttrString(stream_, "flush");
    if (has_flush < 0) {
      const std::string message = fetch_python_error_message();
      PyGILState_Release(gil_state);
      return Unexpected(Error::io_error(message));
    }
    if (has_flush == 0) {
      PyGILState_Release(gil_state);
      return Result<void, Error>();
    }
    PyObject *result = PyObject_CallMethod(stream_, "flush", nullptr);
    if (result == nullptr) {
      const std::string message = fetch_python_error_message();
      PyGILState_Release(gil_state);
      return Unexpected(Error::io_error(message));
    }
    Py_DECREF(result);
    PyGILState_Release(gil_state);
    return Result<void, Error>();
  }

private:
  PyObject *stream_ = nullptr;
};

class PyInputStream final : public InputStream {
public:
  explicit PyInputStream(PyObject *stream, uint32_t buffer_size,
                         PythonStreamReadMethod read_method)
      : stream_(stream), read_method_(read_method),
        read_method_name_(python_stream_read_method_name(read_method)),
        data_(std::max<uint32_t>(buffer_size, static_cast<uint32_t>(1))),
        initial_buffer_size_(
            std::max<uint32_t>(buffer_size, static_cast<uint32_t>(1))),
        owned_buffer_(std::make_unique<Buffer>()) {
    FORY_CHECK(stream_ != nullptr) << "stream must not be null";
    Py_INCREF(stream_);
    bind_buffer(owned_buffer_.get());
  }

  ~PyInputStream() override {
    if (stream_ != nullptr) {
      PyGILState_STATE gil_state = PyGILState_Ensure();
      Py_DECREF(stream_);
      PyGILState_Release(gil_state);
      stream_ = nullptr;
    }
  }

  Result<void, Error> fill_buffer(uint32_t min_fill_size) override {
    if (min_fill_size == 0 || remaining_size() >= min_fill_size) {
      return Result<void, Error>();
    }

    const uint32_t read_pos = buffer_->reader_index_;
    const uint32_t deficit = min_fill_size - remaining_size();
    constexpr uint64_t k_max_u32 = std::numeric_limits<uint32_t>::max();
    const uint64_t required = static_cast<uint64_t>(buffer_->size_) + deficit;
    if (required > k_max_u32) {
      return Unexpected(
          Error::out_of_bound("stream buffer size exceeds uint32 range"));
    }
    if (required > data_.size()) {
      uint64_t new_size =
          std::max<uint64_t>(required, static_cast<uint64_t>(data_.size()) * 2);
      if (new_size > k_max_u32) {
        new_size = k_max_u32;
      }
      reserve(static_cast<uint32_t>(new_size));
    }

    uint32_t write_pos = buffer_->size_;
    while (remaining_size() < min_fill_size) {
      uint32_t writable = static_cast<uint32_t>(data_.size()) - write_pos;
      auto read_result = recv_into(data_.data() + write_pos, writable);
      if (FORY_PREDICT_FALSE(!read_result.ok())) {
        return Unexpected(std::move(read_result).error());
      }
      uint32_t read_bytes = std::move(read_result).value();
      if (read_bytes == 0) {
        return Unexpected(Error::buffer_out_of_bound(read_pos, min_fill_size,
                                                     remaining_size()));
      }
      write_pos += read_bytes;
      buffer_->size_ = write_pos;
    }
    return Result<void, Error>();
  }

  Result<void, Error> read_to(uint8_t *dst, uint32_t length) override {
    if (length == 0) {
      return Result<void, Error>();
    }
    Error error;
    if (FORY_PREDICT_FALSE(!buffer_->ensure_readable(length, error))) {
      return Unexpected(std::move(error));
    }
    std::memcpy(dst, buffer_->data_ + buffer_->reader_index_,
                static_cast<size_t>(length));
    buffer_->reader_index_ += length;
    return Result<void, Error>();
  }

  Result<void, Error> skip(uint32_t size) override {
    if (size == 0) {
      return Result<void, Error>();
    }
    Error error;
    buffer_->increase_reader_index(size, error);
    if (FORY_PREDICT_FALSE(!error.ok())) {
      return Unexpected(std::move(error));
    }
    return Result<void, Error>();
  }

  void shrink_buffer() override {
    if (buffer_ == nullptr) {
      return;
    }

    const uint32_t read_pos = buffer_->reader_index_;
    // Keep Python-backed InputStream shrink behavior aligned with C++:
    // best-effort compaction only after both the global floor (4096) and the
    // configured stream buffer size threshold are crossed.
    if (FORY_PREDICT_TRUE(read_pos <= 4096 ||
                          read_pos < initial_buffer_size_)) {
      return;
    }

    const uint32_t remaining = remaining_size();
    if (read_pos > 0) {
      if (remaining > 0) {
        std::memmove(data_.data(), data_.data() + read_pos,
                     static_cast<size_t>(remaining));
      }
      buffer_->reader_index_ = 0;
      buffer_->size_ = remaining;
      buffer_->writer_index_ = remaining;
    }

    const uint32_t current_capacity = static_cast<uint32_t>(data_.size());
    uint32_t target_capacity = current_capacity;
    if (current_capacity > initial_buffer_size_) {
      if (remaining == 0) {
        target_capacity = initial_buffer_size_;
      } else if (remaining <= current_capacity / 4) {
        const uint32_t doubled =
            remaining > std::numeric_limits<uint32_t>::max() / 2
                ? std::numeric_limits<uint32_t>::max()
                : remaining * 2;
        target_capacity = std::max<uint32_t>(
            initial_buffer_size_,
            std::max<uint32_t>(doubled, static_cast<uint32_t>(1)));
      }
    }
    if (target_capacity < current_capacity) {
      data_.resize(target_capacity);
      data_.shrink_to_fit();
      buffer_->data_ = data_.data();
    }
  }

  Result<void, Error> unread(uint32_t size) override {
    if (FORY_PREDICT_FALSE(size > buffer_->reader_index_)) {
      return Unexpected(Error::buffer_out_of_bound(buffer_->reader_index_, size,
                                                   buffer_->size_));
    }
    buffer_->reader_index_ -= size;
    return Result<void, Error>();
  }

  Buffer &get_buffer() override { return *buffer_; }

  void bind_buffer(Buffer *buffer) override {
    Buffer *target = buffer == nullptr ? owned_buffer_.get() : buffer;
    if (target == nullptr) {
      if (buffer_ != nullptr) {
        buffer_->input_stream_ = nullptr;
      }
      buffer_ = nullptr;
      return;
    }

    if (buffer_ == target) {
      buffer_->data_ = data_.data();
      buffer_->own_data_ = false;
      buffer_->wrapped_vector_ = nullptr;
      buffer_->input_stream_ = this;
      return;
    }

    Buffer *source = buffer_;
    if (source != nullptr) {
      target->size_ = source->size_;
      target->writer_index_ = source->writer_index_;
      target->reader_index_ = source->reader_index_;
      source->input_stream_ = nullptr;
    } else {
      target->size_ = 0;
      target->writer_index_ = 0;
      target->reader_index_ = 0;
    }

    buffer_ = target;
    buffer_->data_ = data_.data();
    buffer_->own_data_ = false;
    buffer_->wrapped_vector_ = nullptr;
    buffer_->input_stream_ = this;
  }

private:
  Result<uint32_t, Error> recv_into(void *dst, uint32_t length) {
    if (length == 0) {
      return 0U;
    }
    PyGILState_STATE gil_state = PyGILState_Ensure();
    PyObject *memory_view =
        PyMemoryView_FromMemory(reinterpret_cast<char *>(dst),
                                static_cast<Py_ssize_t>(length), PyBUF_WRITE);
    if (memory_view == nullptr) {
      std::string message = fetch_python_error_message();
      PyGILState_Release(gil_state);
      return Unexpected(Error::io_error(message));
    }
    PyObject *read_bytes_obj = nullptr;
    switch (read_method_) {
    case PythonStreamReadMethod::ReadInto:
      read_bytes_obj =
          PyObject_CallMethod(stream_, read_method_name_, "O", memory_view);
      break;
    case PythonStreamReadMethod::RecvInto:
    case PythonStreamReadMethod::RecvIntoUnderscore:
      read_bytes_obj =
          PyObject_CallMethod(stream_, read_method_name_, "On", memory_view,
                              static_cast<Py_ssize_t>(length));
      break;
    }
    Py_DECREF(memory_view);
    if (read_bytes_obj == nullptr) {
      std::string message = fetch_python_error_message();
      PyGILState_Release(gil_state);
      return Unexpected(Error::io_error(message));
    }

    Py_ssize_t read_bytes = PyLong_AsSsize_t(read_bytes_obj);
    Py_DECREF(read_bytes_obj);
    if (read_bytes == -1 && PyErr_Occurred()) {
      std::string message = fetch_python_error_message();
      PyGILState_Release(gil_state);
      return Unexpected(Error::io_error(message));
    }
    PyGILState_Release(gil_state);
    if (read_bytes < 0 ||
        static_cast<uint64_t>(read_bytes) > static_cast<uint64_t>(length)) {
      return Unexpected(Error::io_error("python stream " +
                                        std::string(read_method_name_) +
                                        " returned invalid length"));
    }
    return static_cast<uint32_t>(read_bytes);
  }

  uint32_t remaining_size() const {
    return buffer_->size_ - buffer_->reader_index_;
  }

  void reserve(uint32_t new_size) {
    data_.resize(new_size);
    buffer_->data_ = data_.data();
  }

  PyObject *stream_ = nullptr;
  PythonStreamReadMethod read_method_;
  const char *read_method_name_ = nullptr;
  std::vector<uint8_t> data_;
  uint32_t initial_buffer_size_ = 1;
  Buffer *buffer_ = nullptr;
  std::unique_ptr<Buffer> owned_buffer_;
};

enum class PythonCollectionKind : uint8_t {
  List = 0,
  Tuple = 1,
  Set = 2,
};

static PythonCollectionKind
resolve_python_collection_kind(PyObject *collection) {
  if (PyList_CheckExact(collection)) {
    return PythonCollectionKind::List;
  }
  if (PyTuple_CheckExact(collection)) {
    return PythonCollectionKind::Tuple;
  }
  if (FORY_PY_SET_CHECK_EXACT(collection)) {
    return PythonCollectionKind::Set;
  }
  PyErr_Format(PyExc_TypeError,
               "fastpath only supports list/tuple/set collections, got %.200s",
               Py_TYPE(collection)->tp_name);
  return PythonCollectionKind::List;
}

static void set_buffer_error(const Error &error) {
  PyErr_SetString(PyExc_BufferError, error.to_string().c_str());
}

static bool py_long_to_int64(PyObject *value, int64_t *out) {
  int overflow = 0;
  long long converted = PyLong_AsLongLongAndOverflow(value, &overflow);
  if (converted == -1 && PyErr_Occurred()) {
    return false;
  }
  if (overflow != 0) {
    PyErr_SetString(PyExc_OverflowError,
                    "integer out of range for int64 fastpath");
    return false;
  }
  *out = static_cast<int64_t>(converted);
  return true;
}

static bool can_use_list_sequence_fastpath(PyObject **items, Py_ssize_t size,
                                           uint8_t type_id) {
  // This gate is not only about type checks:
  // it enforces "no Python callback during conversion" for the raw ob_item
  // path. If conversion can invoke user code (e.g. __int__/__float__/subclass
  // hooks), a list may be mutated while iterating raw pointers, which is
  // unsafe.
  switch (static_cast<TypeId>(type_id)) {
  case TypeId::STRING:
    for (Py_ssize_t i = 0; i < size; ++i) {
      if (!PyUnicode_CheckExact(items[i])) {
        return false;
      }
    }
    return true;
  case TypeId::VARINT64:
  case TypeId::INT64:
  case TypeId::TAGGED_INT64:
  case TypeId::VARINT32:
  case TypeId::INT32:
  case TypeId::VAR_UINT64:
  case TypeId::UINT64:
  case TypeId::TAGGED_UINT64:
  case TypeId::VAR_UINT32:
  case TypeId::UINT32:
  case TypeId::UINT8:
  case TypeId::INT8:
  case TypeId::UINT16:
  case TypeId::INT16:
    for (Py_ssize_t i = 0; i < size; ++i) {
      if (!PyLong_CheckExact(items[i])) {
        return false;
      }
    }
    return true;
  case TypeId::BOOL:
    for (Py_ssize_t i = 0; i < size; ++i) {
      if (items[i] != Py_True && items[i] != Py_False) {
        return false;
      }
    }
    return true;
  case TypeId::FLOAT32:
  case TypeId::FLOAT64:
    for (Py_ssize_t i = 0; i < size; ++i) {
      if (!PyFloat_CheckExact(items[i])) {
        return false;
      }
    }
    return true;
  default:
    return false;
  }
}

template <typename T>
static bool py_long_to_integral_range(PyObject *value, const char *type_name,
                                      T *out) {
  int64_t converted = 0;
  if (!py_long_to_int64(value, &converted)) {
    return false;
  }
  constexpr int64_t k_min = static_cast<int64_t>(std::numeric_limits<T>::min());
  constexpr int64_t k_max = static_cast<int64_t>(std::numeric_limits<T>::max());
  if (converted < k_min || converted > k_max) {
    PyErr_Format(PyExc_OverflowError, "integer out of range for %s", type_name);
    return false;
  }
  *out = static_cast<T>(converted);
  return true;
}

template <typename T>
static bool py_long_to_unsigned_range(PyObject *value, const char *type_name,
                                      T *out) {
  const unsigned long long converted = PyLong_AsUnsignedLongLong(value);
  if (converted == static_cast<unsigned long long>(-1) &&
      PyErr_Occurred() != nullptr) {
    return false;
  }
  constexpr unsigned long long k_max =
      static_cast<unsigned long long>(std::numeric_limits<T>::max());
  if (converted > k_max) {
    PyErr_Format(PyExc_OverflowError, "integer out of range for %s", type_name);
    return false;
  }
  *out = static_cast<T>(converted);
  return true;
}

static int write_python_string(Buffer *buffer, PyObject *value) {
  if (FORY_PREDICT_FALSE(!PyUnicode_Check(value))) {
    PyErr_Format(PyExc_TypeError, "expected str, got %.200s",
                 Py_TYPE(value)->tp_name);
    return -1;
  }
  if (FORY_PREDICT_FALSE(PyUnicode_READY(value) < 0)) {
    return -1;
  }
  const Py_ssize_t length = PyUnicode_GET_LENGTH(value);
  const int kind = PyUnicode_KIND(value);
  const void *data = PyUnicode_DATA(value);
  uint64_t header = 0;
  uint32_t buffer_size = 0;

  if (kind == PyUnicode_1BYTE_KIND) {
    if (FORY_PREDICT_FALSE(length > std::numeric_limits<uint32_t>::max())) {
      PyErr_SetString(PyExc_OverflowError,
                      "string too large for fastpath encoding");
      return -1;
    }
    buffer_size = static_cast<uint32_t>(length);
    header = (static_cast<uint64_t>(length) << 2U) | 0ULL;
  } else if (kind == PyUnicode_2BYTE_KIND) {
    const uint64_t bytes = static_cast<uint64_t>(length) << 1U;
    if (FORY_PREDICT_FALSE(bytes > std::numeric_limits<uint32_t>::max())) {
      PyErr_SetString(PyExc_OverflowError,
                      "string too large for fastpath encoding");
      return -1;
    }
    buffer_size = static_cast<uint32_t>(bytes);
    // Keep wire format exactly aligned with Buffer.write_string in buffer.pyx.
    header = (static_cast<uint64_t>(length) << 3U) | 1ULL;
  } else {
    Py_ssize_t utf8_size = 0;
    const char *utf8 = PyUnicode_AsUTF8AndSize(value, &utf8_size);
    if (FORY_PREDICT_FALSE(utf8 == nullptr)) {
      return -1;
    }
    if (FORY_PREDICT_FALSE(utf8_size > std::numeric_limits<uint32_t>::max())) {
      PyErr_SetString(PyExc_OverflowError,
                      "string too large for fastpath encoding");
      return -1;
    }
    data = utf8;
    buffer_size = static_cast<uint32_t>(utf8_size);
    header = (static_cast<uint64_t>(buffer_size) << 2U) | 2ULL;
  }

  buffer->write_var_uint64(header);
  if (buffer_size == 0) {
    return 0;
  }
  const uint32_t writer_index = buffer->writer_index();
  buffer->grow(buffer_size);
  buffer->unsafe_put(writer_index, data, buffer_size);
  buffer->increase_writer_index(buffer_size);
  return 0;
}

static PyObject *read_python_string(Buffer *buffer) {
  Error error;
  const uint64_t header = buffer->read_var_uint64(error);
  if (FORY_PREDICT_FALSE(!error.ok())) {
    set_buffer_error(error);
    return nullptr;
  }
  const uint64_t size64 = header >> 2U;
  if (FORY_PREDICT_FALSE(size64 > std::numeric_limits<uint32_t>::max())) {
    PyErr_SetString(PyExc_OverflowError,
                    "string length too large for fastpath decoding");
    return nullptr;
  }
  const uint32_t size = static_cast<uint32_t>(size64);
  const uint32_t encoding = static_cast<uint32_t>(header & 0b11ULL);
  if (size == 0) {
    return PyUnicode_FromStringAndSize("", 0);
  }

  uint32_t reader_index = buffer->reader_index();
  if (FORY_PREDICT_FALSE(size > buffer->size() - reader_index)) {
    if (FORY_PREDICT_FALSE(!buffer->ensure_readable(size, error))) {
      set_buffer_error(error);
      return nullptr;
    }
    reader_index = buffer->reader_index();
  }
  const char *data =
      reinterpret_cast<const char *>(buffer->data() + reader_index);
  buffer->reader_index(reader_index + size);

  if (encoding == 0) {
    return PyUnicode_DecodeLatin1(data, static_cast<Py_ssize_t>(size),
                                  "strict");
  }
  if (encoding == 1) {
    if (FORY_PREDICT_FALSE((size & 1U) != 0U)) {
      PyErr_SetString(PyExc_ValueError, "invalid utf16 string length");
      return nullptr;
    }
    const auto *utf16_data = reinterpret_cast<const uint16_t *>(data);
    if (utf16_has_surrogate_pairs(utf16_data,
                                  static_cast<size_t>(size >> 1U))) {
      int byteorder = -1; // little-endian
      return PyUnicode_DecodeUTF16(data, static_cast<Py_ssize_t>(size),
                                   "strict", &byteorder);
    }
    return PyUnicode_FromKindAndData(PyUnicode_2BYTE_KIND, data,
                                     static_cast<Py_ssize_t>(size >> 1U));
  }
  if (encoding == 2) {
    return PyUnicode_DecodeUTF8(data, static_cast<Py_ssize_t>(size), "strict");
  }
  PyErr_Format(PyExc_ValueError, "unsupported string encoding tag: %u",
               encoding);
  return nullptr;
}

static int write_primitive_item(Buffer *buffer, PyObject *value,
                                uint8_t type_id) {
  switch (static_cast<TypeId>(type_id)) {
  case TypeId::STRING:
    return write_python_string(buffer, value);
  case TypeId::VARINT64:
  case TypeId::INT64: {
    int64_t v = 0;
    if (FORY_PREDICT_FALSE(!py_long_to_int64(value, &v))) {
      return -1;
    }
    if (static_cast<TypeId>(type_id) == TypeId::INT64) {
      buffer->write_int64(v);
    } else {
      buffer->write_var_int64(v);
    }
    return 0;
  }
  case TypeId::VARINT32:
  case TypeId::INT32: {
    int32_t v = 0;
    if (FORY_PREDICT_FALSE(
            !py_long_to_integral_range<int32_t>(value, "int32", &v))) {
      return -1;
    }
    if (static_cast<TypeId>(type_id) == TypeId::INT32) {
      buffer->write_int32(v);
    } else {
      buffer->write_var_int32(v);
    }
    return 0;
  }
  case TypeId::VAR_UINT64:
  case TypeId::UINT64:
  case TypeId::TAGGED_UINT64: {
    uint64_t v = 0;
    if (FORY_PREDICT_FALSE(
            !py_long_to_unsigned_range<uint64_t>(value, "uint64", &v))) {
      return -1;
    }
    if (static_cast<TypeId>(type_id) == TypeId::VAR_UINT64) {
      buffer->write_var_uint64(v);
    } else if (static_cast<TypeId>(type_id) == TypeId::TAGGED_UINT64) {
      buffer->write_tagged_uint64(v);
    } else {
      buffer->write_int64(static_cast<int64_t>(v));
    }
    return 0;
  }
  case TypeId::VAR_UINT32:
  case TypeId::UINT32: {
    uint32_t v = 0;
    if (FORY_PREDICT_FALSE(
            !py_long_to_unsigned_range<uint32_t>(value, "uint32", &v))) {
      return -1;
    }
    if (static_cast<TypeId>(type_id) == TypeId::VAR_UINT32) {
      buffer->write_var_uint32(v);
    } else {
      buffer->write_uint32(v);
    }
    return 0;
  }
  case TypeId::BOOL:
    if (value == Py_True) {
      buffer->write_int8(1);
      return 0;
    }
    if (value == Py_False) {
      buffer->write_int8(0);
      return 0;
    }
    {
      const int truthy = PyObject_IsTrue(value);
      if (FORY_PREDICT_FALSE(truthy < 0)) {
        return -1;
      }
      buffer->write_int8(truthy ? 1 : 0);
    }
    return 0;
  case TypeId::FLOAT32:
  case TypeId::FLOAT64: {
    double v;
    if (PyFloat_CheckExact(value)) {
      v = reinterpret_cast<PyFloatObject *>(value)->ob_fval;
    } else {
      v = PyFloat_AsDouble(value);
      if (FORY_PREDICT_FALSE(v == -1.0 && PyErr_Occurred())) {
        return -1;
      }
    }
    if (static_cast<TypeId>(type_id) == TypeId::FLOAT32) {
      buffer->write_float(static_cast<float>(v));
    } else {
      buffer->write_double(v);
    }
    return 0;
  }
  case TypeId::UINT8: {
    uint8_t v = 0;
    if (FORY_PREDICT_FALSE(
            !py_long_to_unsigned_range<uint8_t>(value, "uint8", &v))) {
      return -1;
    }
    buffer->write_uint8(v);
    return 0;
  }
  case TypeId::INT8: {
    int8_t v = 0;
    if (FORY_PREDICT_FALSE(
            !py_long_to_integral_range<int8_t>(value, "int8", &v))) {
      return -1;
    }
    buffer->write_int8(v);
    return 0;
  }
  case TypeId::UINT16: {
    uint16_t v = 0;
    if (FORY_PREDICT_FALSE(
            !py_long_to_unsigned_range<uint16_t>(value, "uint16", &v))) {
      return -1;
    }
    buffer->write_uint16(v);
    return 0;
  }
  case TypeId::INT16: {
    int16_t v = 0;
    if (FORY_PREDICT_FALSE(
            !py_long_to_integral_range<int16_t>(value, "int16", &v))) {
      return -1;
    }
    buffer->write_int16(v);
    return 0;
  }
  case TypeId::TAGGED_INT64: {
    int64_t v = 0;
    if (FORY_PREDICT_FALSE(
            !py_long_to_integral_range<int64_t>(value, "int64", &v))) {
      return -1;
    }
    buffer->write_tagged_int64(v);
    return 0;
  }
  default:
    PyErr_Format(PyExc_ValueError, "unsupported primitive fastpath type id: %u",
                 static_cast<unsigned>(type_id));
    return -1;
  }
}

static int write_primitive_sequence(PyObject **items, Py_ssize_t size,
                                    Buffer *buffer, uint8_t type_id) {
  switch (static_cast<TypeId>(type_id)) {
  case TypeId::STRING:
    for (Py_ssize_t i = 0; i < size; ++i) {
      if (FORY_PREDICT_FALSE(write_python_string(buffer, items[i]) != 0)) {
        return -1;
      }
    }
    return 0;
  case TypeId::VARINT64:
    if (FORY_PREDICT_FALSE(static_cast<uint64_t>(size) >
                           std::numeric_limits<uint32_t>::max() / 9ULL)) {
      PyErr_SetString(PyExc_OverflowError, "varint64 collection too large");
      return -1;
    }
    {
      const uint32_t max_byte_size = static_cast<uint32_t>(size) * 9U;
      const uint32_t writer_index = buffer->writer_index();
      buffer->grow(max_byte_size);
      uint32_t offset = writer_index;
      for (Py_ssize_t i = 0; i < size; ++i) {
        int64_t v = 0;
        if (FORY_PREDICT_FALSE(!py_long_to_int64(items[i], &v))) {
          return -1;
        }
        const uint64_t zigzag =
            (static_cast<uint64_t>(v) << 1) ^ static_cast<uint64_t>(v >> 63);
        offset += buffer->put_var_uint64(offset, zigzag);
      }
      buffer->increase_writer_index(offset - writer_index);
    }
    return 0;
  case TypeId::VARINT32:
    if (FORY_PREDICT_FALSE(static_cast<uint64_t>(size) >
                           std::numeric_limits<uint32_t>::max() / 5ULL)) {
      PyErr_SetString(PyExc_OverflowError, "varint32 collection too large");
      return -1;
    }
    {
      const uint32_t max_byte_size = static_cast<uint32_t>(size) * 5U;
      const uint32_t writer_index = buffer->writer_index();
      buffer->grow(max_byte_size);
      uint32_t offset = writer_index;
      for (Py_ssize_t i = 0; i < size; ++i) {
        int32_t v = 0;
        if (FORY_PREDICT_FALSE(
                !py_long_to_integral_range<int32_t>(items[i], "int32", &v))) {
          return -1;
        }
        const uint32_t zigzag =
            (static_cast<uint32_t>(v) << 1) ^ static_cast<uint32_t>(v >> 31);
        offset += buffer->put_var_uint32(offset, zigzag);
      }
      buffer->increase_writer_index(offset - writer_index);
    }
    return 0;
  case TypeId::BOOL: {
    const uint64_t byte_size64 = static_cast<uint64_t>(size) * sizeof(bool);
    if (FORY_PREDICT_FALSE(byte_size64 >
                           std::numeric_limits<uint32_t>::max())) {
      PyErr_SetString(PyExc_OverflowError, "bool collection too large");
      return -1;
    }
    const uint32_t byte_size = static_cast<uint32_t>(byte_size64);
    const uint32_t writer_index = buffer->writer_index();
    buffer->grow(byte_size);
    uint32_t offset = writer_index;
    for (Py_ssize_t i = 0; i < size; ++i) {
      buffer->unsafe_put_byte(offset++,
                              static_cast<uint8_t>(items[i] == Py_True));
    }
    buffer->increase_writer_index(byte_size);
    return 0;
  }
  case TypeId::FLOAT64: {
    const uint64_t byte_size64 = static_cast<uint64_t>(size) * sizeof(double);
    if (FORY_PREDICT_FALSE(byte_size64 >
                           std::numeric_limits<uint32_t>::max())) {
      PyErr_SetString(PyExc_OverflowError, "float collection too large");
      return -1;
    }
    const uint32_t byte_size = static_cast<uint32_t>(byte_size64);
    const uint32_t writer_index = buffer->writer_index();
    buffer->grow(byte_size);
    uint32_t offset = writer_index;
    for (Py_ssize_t i = 0; i < size; ++i) {
      PyObject *value = items[i];
      double v;
      if (PyFloat_CheckExact(value)) {
        v = reinterpret_cast<PyFloatObject *>(value)->ob_fval;
      } else {
        v = PyFloat_AsDouble(value);
        if (FORY_PREDICT_FALSE(v == -1.0 && PyErr_Occurred())) {
          return -1;
        }
      }
      buffer->unsafe_put(offset, v);
      offset += sizeof(double);
    }
    buffer->increase_writer_index(byte_size);
    return 0;
  }
  case TypeId::INT8: {
    const uint64_t byte_size64 = static_cast<uint64_t>(size) * sizeof(int8_t);
    if (FORY_PREDICT_FALSE(byte_size64 >
                           std::numeric_limits<uint32_t>::max())) {
      PyErr_SetString(PyExc_OverflowError, "int8 collection too large");
      return -1;
    }
    const uint32_t byte_size = static_cast<uint32_t>(byte_size64);
    const uint32_t writer_index = buffer->writer_index();
    buffer->grow(byte_size);
    uint32_t offset = writer_index;
    for (Py_ssize_t i = 0; i < size; ++i) {
      int8_t v = 0;
      if (FORY_PREDICT_FALSE(
              !py_long_to_integral_range<int8_t>(items[i], "int8", &v))) {
        return -1;
      }
      buffer->unsafe_put_byte(offset++, v);
    }
    buffer->increase_writer_index(byte_size);
    return 0;
  }
  case TypeId::INT16: {
    const uint64_t byte_size64 = static_cast<uint64_t>(size) * sizeof(int16_t);
    if (FORY_PREDICT_FALSE(byte_size64 >
                           std::numeric_limits<uint32_t>::max())) {
      PyErr_SetString(PyExc_OverflowError, "int16 collection too large");
      return -1;
    }
    const uint32_t byte_size = static_cast<uint32_t>(byte_size64);
    const uint32_t writer_index = buffer->writer_index();
    buffer->grow(byte_size);
    uint32_t offset = writer_index;
    for (Py_ssize_t i = 0; i < size; ++i) {
      int16_t v = 0;
      if (FORY_PREDICT_FALSE(
              !py_long_to_integral_range<int16_t>(items[i], "int16", &v))) {
        return -1;
      }
      buffer->unsafe_put(offset, v);
      offset += sizeof(int16_t);
    }
    buffer->increase_writer_index(byte_size);
    return 0;
  }
  case TypeId::INT32: {
    const uint64_t byte_size64 = static_cast<uint64_t>(size) * sizeof(int32_t);
    if (FORY_PREDICT_FALSE(byte_size64 >
                           std::numeric_limits<uint32_t>::max())) {
      PyErr_SetString(PyExc_OverflowError, "int32 collection too large");
      return -1;
    }
    const uint32_t byte_size = static_cast<uint32_t>(byte_size64);
    const uint32_t writer_index = buffer->writer_index();
    buffer->grow(byte_size);
    uint32_t offset = writer_index;
    for (Py_ssize_t i = 0; i < size; ++i) {
      int32_t v = 0;
      if (FORY_PREDICT_FALSE(
              !py_long_to_integral_range<int32_t>(items[i], "int32", &v))) {
        return -1;
      }
      buffer->unsafe_put(offset, v);
      offset += sizeof(int32_t);
    }
    buffer->increase_writer_index(byte_size);
    return 0;
  }
  default:
    for (Py_ssize_t i = 0; i < size; ++i) {
      if (FORY_PREDICT_FALSE(write_primitive_item(buffer, items[i], type_id) !=
                             0)) {
        return -1;
      }
    }
    return 0;
  }
}

static PyObject *read_primitive_item(Buffer *buffer, uint8_t type_id) {
  Error error;
  switch (static_cast<TypeId>(type_id)) {
  case TypeId::STRING:
    return read_python_string(buffer);
  case TypeId::VARINT64:
  case TypeId::INT64: {
    const int64_t v = static_cast<TypeId>(type_id) == TypeId::INT64
                          ? buffer->read_int64(error)
                          : buffer->read_var_int64(error);
    if (FORY_PREDICT_FALSE(!error.ok())) {
      set_buffer_error(error);
      return nullptr;
    }
    return PyLong_FromLongLong(v);
  }
  case TypeId::VARINT32:
  case TypeId::INT32: {
    const int32_t v = static_cast<TypeId>(type_id) == TypeId::INT32
                          ? buffer->read_int32(error)
                          : buffer->read_var_int32(error);
    if (FORY_PREDICT_FALSE(!error.ok())) {
      set_buffer_error(error);
      return nullptr;
    }
    return PyLong_FromLong(v);
  }
  case TypeId::VAR_UINT64:
  case TypeId::UINT64:
  case TypeId::TAGGED_UINT64: {
    uint64_t v = 0;
    if (static_cast<TypeId>(type_id) == TypeId::VAR_UINT64) {
      v = buffer->read_var_uint64(error);
    } else if (static_cast<TypeId>(type_id) == TypeId::TAGGED_UINT64) {
      v = buffer->read_tagged_uint64(error);
    } else {
      v = buffer->read_uint64(error);
    }
    if (FORY_PREDICT_FALSE(!error.ok())) {
      set_buffer_error(error);
      return nullptr;
    }
    return PyLong_FromUnsignedLongLong(v);
  }
  case TypeId::VAR_UINT32:
  case TypeId::UINT32: {
    const uint32_t v = static_cast<TypeId>(type_id) == TypeId::UINT32
                           ? buffer->read_uint32(error)
                           : buffer->read_var_uint32(error);
    if (FORY_PREDICT_FALSE(!error.ok())) {
      set_buffer_error(error);
      return nullptr;
    }
    return PyLong_FromUnsignedLong(v);
  }
  case TypeId::BOOL: {
    const uint8_t v = buffer->read_uint8(error);
    if (FORY_PREDICT_FALSE(!error.ok())) {
      set_buffer_error(error);
      return nullptr;
    }
    return PyBool_FromLong(v != 0);
  }
  case TypeId::FLOAT32: {
    const float v = buffer->read_float(error);
    if (FORY_PREDICT_FALSE(!error.ok())) {
      set_buffer_error(error);
      return nullptr;
    }
    return PyFloat_FromDouble(static_cast<double>(v));
  }
  case TypeId::FLOAT64: {
    const double v = buffer->read_double(error);
    if (FORY_PREDICT_FALSE(!error.ok())) {
      set_buffer_error(error);
      return nullptr;
    }
    return PyFloat_FromDouble(v);
  }
  case TypeId::UINT8: {
    const uint8_t v = buffer->read_uint8(error);
    if (FORY_PREDICT_FALSE(!error.ok())) {
      set_buffer_error(error);
      return nullptr;
    }
    return PyLong_FromUnsignedLong(v);
  }
  case TypeId::INT8: {
    const int8_t v = buffer->read_int8(error);
    if (FORY_PREDICT_FALSE(!error.ok())) {
      set_buffer_error(error);
      return nullptr;
    }
    return PyLong_FromLong(v);
  }
  case TypeId::UINT16: {
    const uint16_t v = buffer->read_uint16(error);
    if (FORY_PREDICT_FALSE(!error.ok())) {
      set_buffer_error(error);
      return nullptr;
    }
    return PyLong_FromUnsignedLong(v);
  }
  case TypeId::INT16: {
    const int16_t v = buffer->read_int16(error);
    if (FORY_PREDICT_FALSE(!error.ok())) {
      set_buffer_error(error);
      return nullptr;
    }
    return PyLong_FromLong(v);
  }
  case TypeId::TAGGED_INT64: {
    const int64_t v = buffer->read_tagged_int64(error);
    if (FORY_PREDICT_FALSE(!error.ok())) {
      set_buffer_error(error);
      return nullptr;
    }
    return PyLong_FromLongLong(v);
  }
  default:
    PyErr_Format(PyExc_ValueError, "unsupported primitive fastpath type id: %u",
                 static_cast<unsigned>(type_id));
    return nullptr;
  }
}

template <typename SetItemFn>
static int read_primitive_sequence_indexed(Buffer *buffer, Py_ssize_t size,
                                           uint8_t type_id,
                                           SetItemFn set_item) {
  Error error;
  switch (static_cast<TypeId>(type_id)) {
  case TypeId::STRING:
    for (Py_ssize_t i = 0; i < size; ++i) {
      PyObject *item = read_python_string(buffer);
      if (FORY_PREDICT_FALSE(item == nullptr)) {
        return -1;
      }
      set_item(i, item);
    }
    return 0;
  case TypeId::VARINT64: {
    const uint8_t *data = buffer->data();
    const uint8_t *ptr = data + buffer->reader_index();
    const uint8_t *end = data + buffer->size();
    for (Py_ssize_t i = 0; i < size; ++i) {
      if (FORY_PREDICT_FALSE(ptr >= end)) {
        PyErr_SetString(PyExc_BufferError,
                        "buffer out of bound while reading varint64");
        return -1;
      }
      uint64_t raw = 0;
      const uint8_t b0 = *ptr++;
      if ((b0 & 0x80) == 0) {
        raw = b0;
      } else {
        if (FORY_PREDICT_FALSE(ptr >= end)) {
          PyErr_SetString(PyExc_BufferError,
                          "buffer out of bound while reading varint64");
          return -1;
        }
        const uint8_t b1 = *ptr++;
        raw = static_cast<uint64_t>(b0 & 0x7F) |
              (static_cast<uint64_t>(b1 & 0x7F) << 7);
        if (FORY_PREDICT_FALSE((b1 & 0x80) != 0)) {
          const uint32_t offset = static_cast<uint32_t>((ptr - data) - 2);
          uint32_t read_bytes = 0;
          raw = buffer->get_var_uint64(offset, &read_bytes);
          if (FORY_PREDICT_FALSE(read_bytes == 0)) {
            PyErr_SetString(PyExc_BufferError,
                            "buffer out of bound while reading varint64");
            return -1;
          }
          ptr = data + offset + read_bytes;
        }
      }
      const int64_t v =
          static_cast<int64_t>((raw >> 1) ^ -static_cast<int64_t>(raw & 1ULL));
      PyObject *item = PyLong_FromLongLong(v);
      if (FORY_PREDICT_FALSE(item == nullptr)) {
        return -1;
      }
      set_item(i, item);
    }
    buffer->reader_index(static_cast<uint32_t>(ptr - data));
  }
    return 0;
  case TypeId::VARINT32: {
    const uint8_t *data = buffer->data();
    const uint8_t *ptr = data + buffer->reader_index();
    const uint8_t *end = data + buffer->size();
    for (Py_ssize_t i = 0; i < size; ++i) {
      if (FORY_PREDICT_FALSE(ptr >= end)) {
        PyErr_SetString(PyExc_BufferError,
                        "buffer out of bound while reading varint32");
        return -1;
      }
      uint32_t raw = 0;
      const uint8_t b0 = *ptr++;
      if ((b0 & 0x80) == 0) {
        raw = b0;
      } else {
        if (FORY_PREDICT_FALSE(ptr >= end)) {
          PyErr_SetString(PyExc_BufferError,
                          "buffer out of bound while reading varint32");
          return -1;
        }
        const uint8_t b1 = *ptr++;
        raw = static_cast<uint32_t>(b0 & 0x7F) |
              (static_cast<uint32_t>(b1 & 0x7F) << 7);
        if (FORY_PREDICT_FALSE((b1 & 0x80) != 0)) {
          const uint32_t offset = static_cast<uint32_t>((ptr - data) - 2);
          uint32_t read_bytes = 0;
          raw = buffer->get_var_uint32(offset, &read_bytes);
          if (FORY_PREDICT_FALSE(read_bytes == 0)) {
            PyErr_SetString(PyExc_BufferError,
                            "buffer out of bound while reading varint32");
            return -1;
          }
          ptr = data + offset + read_bytes;
        }
      }
      const int32_t v =
          static_cast<int32_t>((raw >> 1) ^ -static_cast<int32_t>(raw & 1U));
      PyObject *item = PyLong_FromLong(v);
      if (FORY_PREDICT_FALSE(item == nullptr)) {
        return -1;
      }
      set_item(i, item);
    }
    buffer->reader_index(static_cast<uint32_t>(ptr - data));
  }
    return 0;
  case TypeId::BOOL:
    if (FORY_PREDICT_FALSE(static_cast<uint64_t>(size) >
                           buffer->remaining_size())) {
      PyErr_SetString(PyExc_BufferError,
                      "buffer out of bound while reading bool");
      return -1;
    }
    {
      uint32_t offset = buffer->reader_index();
      const uint8_t *data = buffer->data() + offset;
      for (Py_ssize_t i = 0; i < size; ++i) {
        PyObject *item =
            data[i] != 0 ? FORY_PY_NEWREF(Py_True) : FORY_PY_NEWREF(Py_False);
        set_item(i, item);
      }
      buffer->reader_index(offset + static_cast<uint32_t>(size));
    }
    return 0;
  case TypeId::FLOAT64:
    if (FORY_PREDICT_FALSE(static_cast<uint64_t>(size) >
                           buffer->remaining_size() / sizeof(double))) {
      PyErr_SetString(PyExc_BufferError,
                      "buffer out of bound while reading float64");
      return -1;
    }
    {
      uint32_t offset = buffer->reader_index();
      const uint8_t *data = buffer->data() + offset;
      for (Py_ssize_t i = 0; i < size; ++i) {
        double v;
        std::memcpy(&v, data + i * sizeof(double), sizeof(double));
        PyObject *item = PyFloat_FromDouble(v);
        if (FORY_PREDICT_FALSE(item == nullptr)) {
          return -1;
        }
        set_item(i, item);
      }
      buffer->reader_index(offset +
                           static_cast<uint32_t>(size * sizeof(double)));
    }
    return 0;
  case TypeId::INT8:
    if (FORY_PREDICT_FALSE(static_cast<uint64_t>(size) >
                           buffer->remaining_size())) {
      PyErr_SetString(PyExc_BufferError,
                      "buffer out of bound while reading int8");
      return -1;
    }
    {
      uint32_t offset = buffer->reader_index();
      const int8_t *data =
          reinterpret_cast<const int8_t *>(buffer->data() + offset);
      for (Py_ssize_t i = 0; i < size; ++i) {
        PyObject *item = PyLong_FromLong(data[i]);
        if (FORY_PREDICT_FALSE(item == nullptr)) {
          return -1;
        }
        set_item(i, item);
      }
      buffer->reader_index(offset + static_cast<uint32_t>(size));
    }
    return 0;
  case TypeId::INT16:
    if (FORY_PREDICT_FALSE(static_cast<uint64_t>(size) >
                           buffer->remaining_size() / sizeof(int16_t))) {
      PyErr_SetString(PyExc_BufferError,
                      "buffer out of bound while reading int16");
      return -1;
    }
    {
      uint32_t offset = buffer->reader_index();
      const uint8_t *data = buffer->data() + offset;
      for (Py_ssize_t i = 0; i < size; ++i) {
        int16_t v;
        std::memcpy(&v, data + i * sizeof(int16_t), sizeof(int16_t));
        PyObject *item = PyLong_FromLong(v);
        if (FORY_PREDICT_FALSE(item == nullptr)) {
          return -1;
        }
        set_item(i, item);
      }
      buffer->reader_index(offset +
                           static_cast<uint32_t>(size * sizeof(int16_t)));
    }
    return 0;
  case TypeId::INT32:
    if (FORY_PREDICT_FALSE(static_cast<uint64_t>(size) >
                           buffer->remaining_size() / sizeof(int32_t))) {
      PyErr_SetString(PyExc_BufferError,
                      "buffer out of bound while reading int32");
      return -1;
    }
    {
      uint32_t offset = buffer->reader_index();
      const uint8_t *data = buffer->data() + offset;
      for (Py_ssize_t i = 0; i < size; ++i) {
        int32_t v;
        std::memcpy(&v, data + i * sizeof(int32_t), sizeof(int32_t));
        PyObject *item = PyLong_FromLong(v);
        if (FORY_PREDICT_FALSE(item == nullptr)) {
          return -1;
        }
        set_item(i, item);
      }
      buffer->reader_index(offset +
                           static_cast<uint32_t>(size * sizeof(int32_t)));
    }
    return 0;
  default:
    for (Py_ssize_t i = 0; i < size; ++i) {
      PyObject *item = read_primitive_item(buffer, type_id);
      if (FORY_PREDICT_FALSE(item == nullptr)) {
        return -1;
      }
      set_item(i, item);
    }
    return 0;
  }
}

int Fory_PyPrimitiveCollectionWriteToBuffer(PyObject *collection,
                                            Buffer *buffer, uint8_t type_id) {
  PyObject **items = py_sequence_get_items(collection);
  if (items != nullptr) {
    const Py_ssize_t size = Py_SIZE(collection);
    // Refactor guard:
    // - tuple is immutable, so raw item pointer iteration is always safe.
    // - list is mutable, so use raw pointer path only when
    //   can_use_list_sequence_fastpath(...) proves callback-free conversion.
    // Otherwise we must fall back to iterator path for safety.
    if (!PyList_CheckExact(collection) ||
        can_use_list_sequence_fastpath(items, size, type_id)) {
      return write_primitive_sequence(items, size, buffer, type_id);
    }
  }
  PyObject *iterator = PyObject_GetIter(collection);
  if (FORY_PREDICT_FALSE(iterator == nullptr)) {
    return -1;
  }
  int rc = 0;
  for (PyObject *item = PyIter_Next(iterator); item != nullptr;
       item = PyIter_Next(iterator)) {
    if (FORY_PREDICT_FALSE(write_primitive_item(buffer, item, type_id) != 0)) {
      Py_DECREF(item);
      rc = -1;
      break;
    }
    Py_DECREF(item);
  }
  if (FORY_PREDICT_FALSE(rc == 0 && PyErr_Occurred() != nullptr)) {
    rc = -1;
  }
  Py_DECREF(iterator);
  return rc;
}

int Fory_PyPrimitiveCollectionReadFromBuffer(PyObject *collection,
                                             Buffer *buffer, Py_ssize_t size,
                                             uint8_t type_id) {
  if (FORY_PREDICT_FALSE(size < 0)) {
    PyErr_SetString(PyExc_ValueError, "negative collection size");
    return -1;
  }

  const PythonCollectionKind kind = resolve_python_collection_kind(collection);
  if (FORY_PREDICT_FALSE(PyErr_Occurred() != nullptr)) {
    return -1;
  }
  if (kind == PythonCollectionKind::List && Py_SIZE(collection) < size) {
    PyErr_SetString(PyExc_ValueError,
                    "list collection size is smaller than requested read size");
    return -1;
  }
  if (kind == PythonCollectionKind::Tuple && Py_SIZE(collection) < size) {
    PyErr_SetString(
        PyExc_ValueError,
        "tuple collection size is smaller than requested read size");
    return -1;
  }
  if (!buffer->has_input_stream() && kind == PythonCollectionKind::List) {
    return read_primitive_sequence_indexed(
        buffer, size, type_id, [collection](Py_ssize_t i, PyObject *item) {
          PyList_SET_ITEM(collection, i, item);
        });
  }
  if (!buffer->has_input_stream() && kind == PythonCollectionKind::Tuple) {
    return read_primitive_sequence_indexed(
        buffer, size, type_id, [collection](Py_ssize_t i, PyObject *item) {
          PyTuple_SET_ITEM(collection, i, item);
        });
  }

  for (Py_ssize_t i = 0; i < size; ++i) {
    PyObject *item = read_primitive_item(buffer, type_id);
    if (FORY_PREDICT_FALSE(item == nullptr)) {
      return -1;
    }
    if (kind == PythonCollectionKind::List) {
      PyList_SET_ITEM(collection, i, item);
    } else if (kind == PythonCollectionKind::Tuple) {
      PyTuple_SET_ITEM(collection, i, item);
    } else {
      if (FORY_PREDICT_FALSE(PySet_Add(collection, item) < 0)) {
        Py_DECREF(item);
        return -1;
      }
      Py_DECREF(item);
    }
  }
  return 0;
}

int Fory_PyWriteBasicFieldToBuffer(PyObject *value, Buffer *buffer,
                                   uint8_t type_id) {
  return write_primitive_item(buffer, value, type_id);
}

PyObject *Fory_PyReadBasicFieldFromBuffer(Buffer *buffer, uint8_t type_id) {
  return read_primitive_item(buffer, type_id);
}

int Fory_PyCreateBufferFromStream(PyObject *stream, uint32_t buffer_size,
                                  Buffer **out, std::string *error_message) {
  if (stream == nullptr) {
    *error_message = "stream must not be null";
    return -1;
  }
  PythonStreamReadMethod read_method = PythonStreamReadMethod::ReadInto;
  if (!resolve_python_stream_read_method(stream, &read_method, error_message)) {
    return -1;
  }
  try {
    auto input_stream =
        std::make_shared<PyInputStream>(stream, buffer_size, read_method);
    *out = new Buffer(*input_stream);
    return 0;
  } catch (const std::exception &e) {
    *error_message = e.what();
    return -1;
  }
}

int Fory_PyCreateOutputStream(PyObject *stream, OutputStream **out,
                              std::string *error_message) {
  if (stream == nullptr) {
    *error_message = "stream must not be null";
    return -1;
  }
  // See PyOutputStream::write_to_stream contract: the provided sink must not
  // retain passed write buffers after write(...) returns.
  if (!resolve_python_stream_write_method(stream, error_message)) {
    return -1;
  }
  try {
    *out = new PyOutputStream(stream, 4096);
    return 0;
  } catch (const std::exception &e) {
    *error_message = e.what();
    return -1;
  }
}

int Fory_PyBindBufferToOutputStream(Buffer *buffer, OutputStream *output_stream,
                                    std::string *error_message) {
  if (buffer == nullptr) {
    *error_message = "buffer must not be null";
    return -1;
  }
  if (output_stream == nullptr) {
    *error_message = "output stream must not be null";
    return -1;
  }
  buffer->bind_output_stream(output_stream);
  return 0;
}

int Fory_PyClearBufferOutputStream(Buffer *buffer, std::string *error_message) {
  if (buffer == nullptr) {
    *error_message = "buffer must not be null";
    return -1;
  }
  buffer->clear_output_stream();
  return 0;
}
} // namespace fory
