PARQUET-1095: [C++] Read and write Arrow decimal values

This depends on:
- [x] [ARROW-1607](https://github.com/apache/arrow/pull/1128)
- [x] [ARROW-1656](https://github.com/apache/arrow/pull/1184)
- [x] [ARROW-1588](https://github.com/apache/arrow/pull/1211)
- [x] Add tests for writing different sizes of values

Author: Phillip Cloud <cpcloud@gmail.com>
Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #403 from cpcloud/PARQUET-1095 and squashes the following commits:

8c3d222 [Phillip Cloud] Remove loop from BytesToInteger
63018bc [Wes McKinney] Suppress C4996 due to arrow/util/variant.h
e4b02d3 [Phillip Cloud] Refactor types.h
83948ec [Phillip Cloud] Add last_value_ init
51965cd [Phillip Cloud] Min commit that contains the unique kernel in arrow
e25c59b [Phillip Cloud] Fix reader writer test for unique kernel addition
da0a7eb [Phillip Cloud] Update for ARROW-1811
16935de [Phillip Cloud] Reverse operand order and explicit cast
6036ca5 [Phillip Cloud] ARROW-1811
c5c4294 [Phillip Cloud] Fix issues
32a4abe [Phillip Cloud] Cleanup iteration a bit
920832a [Phillip Cloud] Update arrow version
9f97c1d [Phillip Cloud] Update for ARROW-1794: rename DecimalArray to Decimal128Array
b2e0290 [Phillip Cloud] IWYU
64748a8 [Phillip Cloud] Copy from arrow for now
6c9e2a7 [Phillip Cloud] Reduce the number of decimal test cases
7ab2e5c [Phillip Cloud] Parameterize on precision
30655d6 [Phillip Cloud] Use arrow random_decimals
9ff7eb4 [Phillip Cloud] Remove specific template parameters
1eee6a9 [Phillip Cloud] Remove specific randint call
8808e4c [Phillip Cloud] Bump arrow version
659fbc1 [Phillip Cloud] Fix deprecated API call
e162ca1 [Phillip Cloud] Allocate scratch space to hold the byteswapped values
5c9292b [Phillip Cloud] Proper dcheck call
1782da0 [Phillip Cloud] Use arrow
3d243d5 [Phillip Cloud] Checkpoint [ci skip]
028fb03 [Phillip Cloud] Remove garbage values
46dff15 [Phillip Cloud] Clean up uint32 test
613255e [Phillip Cloud] Do not use std::copy when reinterpret_cast will suffice
2917a62 [Phillip Cloud] PARQUET-1095: [C++] Read and write Arrow decimal values
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5aa8b93..c524ceb 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -460,6 +460,11 @@
   set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CMAKE_CLANG_OPTIONS}")
 endif()
 
+if ("${COMPILER_FAMILY}" STREQUAL "msvc")
+  # MSVC version of -Wno-deprecated
+  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4996")
+endif()
+
 ############################################################
 # "make lint" target
 ############################################################
diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake
index a470fc1..fe1d499 100644
--- a/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cmake_modules/ThirdpartyToolchain.cmake
@@ -366,7 +366,7 @@
     -DARROW_BUILD_TESTS=OFF)
 
   if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "")
-    set(ARROW_VERSION "0e21f84c2fc26dba949a03ee7d7ebfade0a65b81")  # Arrow 0.7.1
+    set(ARROW_VERSION "f2806fa518583907a129b2ecb0b7ec8758b69e17")
   else()
     set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}")
   endif()
diff --git a/data/fixed_length_decimal.parquet b/data/fixed_length_decimal.parquet
new file mode 100644
index 0000000..69fce53
--- /dev/null
+++ b/data/fixed_length_decimal.parquet
Binary files differ
diff --git a/data/fixed_length_decimal_legacy.parquet b/data/fixed_length_decimal_legacy.parquet
new file mode 100644
index 0000000..b0df62a
--- /dev/null
+++ b/data/fixed_length_decimal_legacy.parquet
Binary files differ
diff --git a/data/int32_decimal.parquet b/data/int32_decimal.parquet
new file mode 100644
index 0000000..5bf2d4e
--- /dev/null
+++ b/data/int32_decimal.parquet
Binary files differ
diff --git a/data/int64_decimal.parquet b/data/int64_decimal.parquet
new file mode 100644
index 0000000..5043bca
--- /dev/null
+++ b/data/int64_decimal.parquet
Binary files differ
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index a18c565..0e0831e 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -24,6 +24,7 @@
 #include "gtest/gtest.h"
 
 #include <sstream>
+#include <arrow/compute/api.h>
 
 #include "parquet/api/reader.h"
 #include "parquet/api/writer.h"
@@ -37,13 +38,13 @@
 
 #include "arrow/api.h"
 #include "arrow/test-util.h"
+#include "arrow/util/decimal.h"
 
 using arrow::Array;
 using arrow::ArrayVisitor;
 using arrow::Buffer;
 using arrow::ChunkedArray;
 using arrow::Column;
-using arrow::EncodeArrayToDictionary;
 using arrow::ListArray;
 using arrow::PoolBuffer;
 using arrow::PrimitiveArray;
@@ -51,6 +52,9 @@
 using arrow::Table;
 using arrow::TimeUnit;
 using arrow::default_memory_pool;
+using arrow::compute::DictionaryEncode;
+using arrow::compute::FunctionContext;
+using arrow::compute::Datum;
 using arrow::io::BufferReader;
 
 using arrow::test::randint;
@@ -68,10 +72,10 @@
 namespace parquet {
 namespace arrow {
 
-const int SMALL_SIZE = 100;
-const int LARGE_SIZE = 10000;
+static constexpr int SMALL_SIZE = 100;
+static constexpr int LARGE_SIZE = 10000;
 
-constexpr uint32_t kDefaultSeed = 0;
+static constexpr uint32_t kDefaultSeed = 0;
 
 LogicalType::type get_logical_type(const ::arrow::DataType& type) {
   switch (type.id()) {
@@ -118,6 +122,8 @@
           static_cast<const ::arrow::DictionaryType&>(type);
       return get_logical_type(*dict_type.dictionary()->type());
     }
+    case ArrowId::DECIMAL:
+      return LogicalType::DECIMAL;
     default:
       break;
   }
@@ -147,6 +153,7 @@
     case ArrowId::STRING:
       return ParquetType::BYTE_ARRAY;
     case ArrowId::FIXED_SIZE_BINARY:
+    case ArrowId::DECIMAL:
       return ParquetType::FIXED_LEN_BYTE_ARRAY;
     case ArrowId::DATE32:
       return ParquetType::INT32;
@@ -299,6 +306,7 @@
 const std::string test_traits<::arrow::StringType>::value("Test");              // NOLINT
 const std::string test_traits<::arrow::BinaryType>::value("\x00\x01\x02\x03");  // NOLINT
 const std::string test_traits<::arrow::FixedSizeBinaryType>::value("Fixed");    // NOLINT
+
 template <typename T>
 using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
 
@@ -342,28 +350,44 @@
 
 static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::arrow::DataType& type,
                                                    Repetition::type repetition) {
-  int byte_width;
-  // Decimal is not implemented yet.
+  int32_t byte_width = -1;
+  int32_t precision = -1;
+  int32_t scale = -1;
+
   switch (type.id()) {
     case ::arrow::Type::DICTIONARY: {
-      const ::arrow::DictionaryType& dict_type =
-          static_cast<const ::arrow::DictionaryType&>(type);
+      const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(type);
       const ::arrow::DataType& values_type = *dict_type.dictionary()->type();
-      if (values_type.id() == ::arrow::Type::FIXED_SIZE_BINARY) {
-        byte_width =
-            static_cast<const ::arrow::FixedSizeBinaryType&>(values_type).byte_width();
-      } else {
-        byte_width = -1;
+      switch (values_type.id()) {
+        case ::arrow::Type::FIXED_SIZE_BINARY:
+          byte_width =
+              static_cast<const ::arrow::FixedSizeBinaryType&>(values_type).byte_width();
+          break;
+        case ::arrow::Type::DECIMAL: {
+          const auto& decimal_type =
+              static_cast<const ::arrow::Decimal128Type&>(values_type);
+          precision = decimal_type.precision();
+          scale = decimal_type.scale();
+          byte_width = DecimalSize(precision);
+        } break;
+        default:
+          break;
       }
     } break;
     case ::arrow::Type::FIXED_SIZE_BINARY:
       byte_width = static_cast<const ::arrow::FixedSizeBinaryType&>(type).byte_width();
       break;
+    case ::arrow::Type::DECIMAL: {
+      const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(type);
+      precision = decimal_type.precision();
+      scale = decimal_type.scale();
+      byte_width = DecimalSize(precision);
+    } break;
     default:
-      byte_width = -1;
+      break;
   }
   auto pnode = PrimitiveNode::Make("column1", repetition, get_physical_type(type),
-                                   get_logical_type(type), byte_width);
+                                   get_logical_type(type), byte_width, precision, scale);
   NodePtr node_ =
       GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
   return std::static_pointer_cast<GroupNode>(node_);
@@ -371,7 +395,7 @@
 
 namespace internal {
 
-void AssertArraysEqual(const Array &expected, const Array &actual) {
+void AssertArraysEqual(const Array& expected, const Array& actual) {
   if (!actual.Equals(expected)) {
     std::stringstream pp_result;
     std::stringstream pp_expected;
@@ -526,11 +550,19 @@
 // There we write an UInt32 Array but receive an Int64 Array as result for
 // Parquet version 1.0.
 
-typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
-                         ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type,
-                         ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::Date32Type,
-                         ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType,
-                         ::arrow::BinaryType, ::arrow::FixedSizeBinaryType>
+typedef ::testing::Types<
+    ::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type,
+    ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type,
+    ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType,
+    ::arrow::BinaryType, ::arrow::FixedSizeBinaryType, DecimalWithPrecisionAndScale<1>,
+    DecimalWithPrecisionAndScale<3>, DecimalWithPrecisionAndScale<5>,
+    DecimalWithPrecisionAndScale<7>, DecimalWithPrecisionAndScale<10>,
+    DecimalWithPrecisionAndScale<12>, DecimalWithPrecisionAndScale<15>,
+    DecimalWithPrecisionAndScale<17>, DecimalWithPrecisionAndScale<19>,
+    DecimalWithPrecisionAndScale<22>, DecimalWithPrecisionAndScale<23>,
+    DecimalWithPrecisionAndScale<24>, DecimalWithPrecisionAndScale<27>,
+    DecimalWithPrecisionAndScale<29>, DecimalWithPrecisionAndScale<32>,
+    DecimalWithPrecisionAndScale<34>, DecimalWithPrecisionAndScale<38>>
     TestTypes;
 
 TYPED_TEST_CASE(TestParquetIO, TestTypes);
@@ -590,8 +622,10 @@
 
   ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
 
-  std::shared_ptr<Array> dict_values;
-  ASSERT_OK(EncodeArrayToDictionary(*values, default_memory_pool(), &dict_values));
+  Datum out;
+  FunctionContext ctx(default_memory_pool());
+  ASSERT_OK(DictionaryEncode(&ctx, Datum(values), &out));
+  std::shared_ptr<Array> dict_values = MakeArray(out.array());
   std::shared_ptr<GroupNode> schema =
       MakeSimpleSchema(*dict_values->type(), Repetition::OPTIONAL);
   this->WriteColumn(schema, dict_values);
@@ -856,25 +890,43 @@
   ASSERT_OK_NO_THROW(
       WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512, properties));
 
-  std::shared_ptr<Array> expected_values;
   std::shared_ptr<PoolBuffer> int64_data =
       std::make_shared<PoolBuffer>(::arrow::default_memory_pool());
   {
     ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length()));
-    int64_t* int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data());
-    const uint32_t* uint32_data_ptr =
-        reinterpret_cast<const uint32_t*>(values->values()->data());
-    // std::copy might be faster but this is explicit on the casts)
-    for (int64_t i = 0; i < values->length(); i++) {
-      int64_data_ptr[i] = static_cast<int64_t>(uint32_data_ptr[i]);
-    }
+    auto int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data());
+    auto uint32_data_ptr = reinterpret_cast<const uint32_t*>(values->values()->data());
+    const auto cast_uint32_to_int64 = [](uint32_t value) {
+      return static_cast<int64_t>(value);
+    };
+    std::transform(uint32_data_ptr, uint32_data_ptr + values->length(), int64_data_ptr,
+                   cast_uint32_to_int64);
   }
 
   std::vector<std::shared_ptr<Buffer>> buffers{values->null_bitmap(), int64_data};
   auto arr_data = std::make_shared<::arrow::ArrayData>(::arrow::int64(), values->length(),
                                                        buffers, values->null_count());
-  ASSERT_OK(MakeArray(arr_data, &expected_values));
-  this->ReadAndCheckSingleColumnTable(expected_values);
+  std::shared_ptr<Array> expected_values = MakeArray(arr_data);
+  ASSERT_NE(expected_values, NULLPTR);
+
+  const auto& expected = static_cast<const ::arrow::Int64Array&>(*expected_values);
+  ASSERT_GT(values->length(), 0);
+  ASSERT_EQ(values->length(), expected.length());
+
+  // TODO(phillipc): Is there a better way to compare these two arrays?
+  // AssertArraysEqual requires the same type, but we only care about values in this case
+  for (int i = 0; i < expected.length(); ++i) {
+    const bool value_is_valid = values->IsValid(i);
+    const bool expected_value_is_valid = expected.IsValid(i);
+
+    ASSERT_EQ(expected_value_is_valid, value_is_valid);
+
+    if (value_is_valid) {
+      uint32_t value = values->Value(i);
+      int64_t expected_value = expected.Value(i);
+      ASSERT_EQ(expected_value, static_cast<int64_t>(value));
+    }
+  }
 }
 
 using TestStringParquetIO = TestParquetIO<::arrow::StringType>;
@@ -1432,7 +1484,7 @@
   offset_values.push_back(total_elements);
 
   std::vector<int8_t> value_draws;
-  randint<int8_t>(total_elements, 0, 100, &value_draws);
+  randint(total_elements, 0, 100, &value_draws);
 
   std::vector<bool> is_valid;
   random_is_valid(total_elements, 0.1, &is_valid);
@@ -1889,6 +1941,61 @@
   ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table));
 }
 
+class TestArrowReaderAdHocSpark
+    : public ::testing::TestWithParam<
+          std::tuple<std::string, std::shared_ptr<::arrow::DataType>>> {};
+
+TEST_P(TestArrowReaderAdHocSpark, ReadDecimals) {
+  std::string path(std::getenv("PARQUET_TEST_DATA"));
+
+  std::string filename;
+  std::shared_ptr<::arrow::DataType> decimal_type;
+  std::tie(filename, decimal_type) = GetParam();
+
+  path += "/" + filename;
+  ASSERT_GT(path.size(), 0);
+
+  auto pool = ::arrow::default_memory_pool();
+
+  std::unique_ptr<FileReader> arrow_reader;
+  ASSERT_NO_THROW(
+      arrow_reader.reset(new FileReader(pool, ParquetFileReader::OpenFile(path, false))));
+  std::shared_ptr<::arrow::Table> table;
+  ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table));
+
+  ASSERT_EQ(1, table->num_columns());
+
+  constexpr int32_t expected_length = 24;
+
+  auto value_column = table->column(0);
+  ASSERT_EQ(expected_length, value_column->length());
+
+  auto raw_array = value_column->data();
+  ASSERT_EQ(1, raw_array->num_chunks());
+
+  auto chunk = raw_array->chunk(0);
+
+  std::shared_ptr<Array> expected_array;
+
+  ::arrow::Decimal128Builder builder(decimal_type, pool);
+
+  for (int32_t i = 0; i < expected_length; ++i) {
+    ::arrow::Decimal128 value((i + 1) * 100);
+    ASSERT_OK(builder.Append(value));
+  }
+  ASSERT_OK(builder.Finish(&expected_array));
+
+  internal::AssertArraysEqual(*expected_array, *chunk);
+}
+
+INSTANTIATE_TEST_CASE_P(
+    ReadDecimals, TestArrowReaderAdHocSpark,
+    ::testing::Values(
+        std::make_tuple("int32_decimal.parquet", ::arrow::decimal(4, 2)),
+        std::make_tuple("int64_decimal.parquet", ::arrow::decimal(10, 2)),
+        std::make_tuple("fixed_length_decimal.parquet", ::arrow::decimal(25, 2)),
+        std::make_tuple("fixed_length_decimal_legacy.parquet", ::arrow::decimal(13, 2))));
+
 }  // namespace arrow
 
 }  // namespace parquet
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index a7a62c5..7ed9ad8 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -51,7 +51,7 @@
 const auto TIMESTAMP_US = ::arrow::timestamp(TimeUnit::MICRO);
 const auto TIMESTAMP_NS = ::arrow::timestamp(TimeUnit::NANO);
 const auto BINARY = ::arrow::binary();
-const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4);
+const auto DECIMAL_8_4 = std::make_shared<::arrow::Decimal128Type>(8, 4);
 
 class TestConvertParquetSchema : public ::testing::Test {
  public:
@@ -62,8 +62,8 @@
     for (int i = 0; i < expected_schema->num_fields(); ++i) {
       auto lhs = result_schema_->field(i);
       auto rhs = expected_schema->field(i);
-      EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
-                                    << " != " << rhs->ToString();
+      EXPECT_TRUE(lhs->Equals(rhs))
+          << i << " " << lhs->ToString() << " != " << rhs->ToString();
     }
   }
 
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 5edc837..3ca49cb 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -29,6 +29,7 @@
 
 #include "arrow/api.h"
 #include "arrow/util/bit-util.h"
+#include "arrow/util/decimal.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/parallel.h"
 
@@ -716,7 +717,8 @@
 
 template <typename ArrowType, typename ParquetType>
 using supports_fast_path =
-    typename std::enable_if<supports_fast_path_impl<ArrowType, ParquetType>::value>::type;
+    typename std::enable_if<supports_fast_path_impl<ArrowType, ParquetType>::value,
+                            ParquetType>::type;
 
 template <typename ArrowType, typename ParquetType, typename Enable = void>
 struct TransferFunctor {
@@ -868,12 +870,248 @@
       // Convert from BINARY type to STRING
       auto new_data = (*out)->data()->ShallowCopy();
       new_data->type = type;
-      RETURN_NOT_OK(::arrow::MakeArray(new_data, out));
+      *out = ::arrow::MakeArray(new_data);
     }
     return Status::OK();
   }
 };
 
+static uint64_t BytesToInteger(const uint8_t* bytes, int32_t start, int32_t stop) {
+  using ::arrow::BitUtil::FromBigEndian;
+
+  const int32_t length = stop - start;
+
+  DCHECK_GE(length, 0);
+  DCHECK_LE(length, 8);
+
+  switch (length) {
+    case 0:
+      return 0;
+    case 1:
+      return bytes[start];
+    case 2:
+      return FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start));
+    case 3: {
+      const uint64_t first_two_bytes =
+          FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start));
+      const uint64_t last_byte = bytes[stop - 1];
+      return first_two_bytes << 8 | last_byte;
+    }
+    case 4:
+      return FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start));
+    case 5: {
+      const uint64_t first_four_bytes =
+          FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start));
+      const uint64_t last_byte = bytes[stop - 1];
+      return first_four_bytes << 8 | last_byte;
+    }
+    case 6: {
+      const uint64_t first_four_bytes =
+          FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start));
+      const uint64_t last_two_bytes =
+          FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start + 4));
+      return first_four_bytes << 16 | last_two_bytes;
+    }
+    case 7: {
+      const uint64_t first_four_bytes =
+          FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start));
+      const uint64_t second_two_bytes =
+          FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start + 4));
+      const uint64_t last_byte = bytes[stop - 1];
+      return first_four_bytes << 24 | second_two_bytes << 8 | last_byte;
+    }
+    case 8:
+      return FromBigEndian(*reinterpret_cast<const uint64_t*>(bytes + start));
+    default: {
+      DCHECK(false);
+      return UINT64_MAX;
+    }
+  }
+}
+
+static constexpr int32_t kMinDecimalBytes = 1;
+static constexpr int32_t kMaxDecimalBytes = 16;
+
+/// \brief Convert a sequence of big-endian bytes to one int64_t (high bits) and one
+/// uint64_t (low bits).
+static void BytesToIntegerPair(const uint8_t* bytes,
+                               const int32_t total_number_of_bytes_used, int64_t* high,
+                               uint64_t* low) {
+  DCHECK_GE(total_number_of_bytes_used, kMinDecimalBytes);
+  DCHECK_LE(total_number_of_bytes_used, kMaxDecimalBytes);
+
+  /// Bytes are coming in big-endian, so the first byte is the MSB and therefore holds the
+  /// sign bit.
+  const bool is_negative = static_cast<int8_t>(bytes[0]) < 0;
+
+  /// Sign extend the low bits if necessary
+  *low = UINT64_MAX * (is_negative && total_number_of_bytes_used < 8);
+  *high = -1 * (is_negative && total_number_of_bytes_used < kMaxDecimalBytes);
+
+  /// Stop byte of the high bytes
+  const int32_t high_bits_offset = std::max(0, total_number_of_bytes_used - 8);
+
+  /// Shift left enough bits to make room for the incoming int64_t
+  *high <<= high_bits_offset * CHAR_BIT;
+
+  /// Preserve the upper bits by inplace OR-ing the int64_t
+  *high |= BytesToInteger(bytes, 0, high_bits_offset);
+
+  /// Stop byte of the low bytes
+  const int32_t low_bits_offset = std::min(total_number_of_bytes_used, 8);
+
+  /// Shift left enough bits to make room for the incoming uint64_t
+  *low <<= low_bits_offset * CHAR_BIT;
+
+  /// Preserve the upper bits by inplace OR-ing the uint64_t
+  *low |= BytesToInteger(bytes, high_bits_offset, total_number_of_bytes_used);
+}
+
+static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width,
+                                          uint8_t* out_buf) {
+  // view the first 8 bytes as an unsigned 64-bit integer
+  auto low = reinterpret_cast<uint64_t*>(out_buf);
+
+  // view the second 8 bytes as a signed 64-bit integer
+  auto high = reinterpret_cast<int64_t*>(out_buf + sizeof(uint64_t));
+
+  // Convert the fixed size binary array bytes into a Decimal128 compatible layout
+  BytesToIntegerPair(value, byte_width, high, low);
+}
+
+/// \brief Convert an array of FixedLenByteArrays to an arrow::Decimal128Array
+/// We do this by:
+/// 1. Creating a arrow::FixedSizeBinaryArray from the RecordReader's builder
+/// 2. Allocating a buffer for the arrow::Decimal128Array
+/// 3. Converting the big-endian bytes in the FixedSizeBinaryArray to two integers
+///    representing the high and low bits of each decimal value.
+template <>
+struct TransferFunctor<::arrow::Decimal128Type, FLBAType> {
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type,
+                    std::shared_ptr<Array>* out) {
+    DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
+
+    // Finish the built data into a temporary array
+    std::shared_ptr<Array> array;
+    RETURN_NOT_OK(reader->builder()->Finish(&array));
+    const auto& fixed_size_binary_array =
+        static_cast<const ::arrow::FixedSizeBinaryArray&>(*array);
+
+    // Get the byte width of the values in the FixedSizeBinaryArray. Most of the time
+    // this will be different from the decimal array width because we write the minimum
+    // number of bytes necessary to represent a given precision
+    const int32_t byte_width =
+        static_cast<const ::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type())
+            .byte_width();
+
+    // The byte width of each decimal value
+    const int32_t type_length =
+        static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
+
+    // number of elements in the entire array
+    const int64_t length = fixed_size_binary_array.length();
+
+    // allocate memory for the decimal array
+    std::shared_ptr<Buffer> data;
+    RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
+
+    // raw bytes that we can write to
+    uint8_t* out_ptr = data->mutable_data();
+
+    // convert each FixedSizeBinary value to valid decimal bytes
+    const int64_t null_count = fixed_size_binary_array.null_count();
+    if (null_count > 0) {
+      for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
+        if (!fixed_size_binary_array.IsNull(i)) {
+          RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width,
+                                 out_ptr);
+        }
+      }
+    } else {
+      for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
+        RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr);
+      }
+    }
+
+    *out = std::make_shared<::arrow::Decimal128Array>(
+        type, length, data, fixed_size_binary_array.null_bitmap(), null_count);
+    return Status::OK();
+  }
+};
+
+/// \brief Convert an Int32 or Int64 array into a Decimal128Array
+/// The parquet spec allows systems to write decimals in int32, int64 if the values are
+/// small enough to fit in less 4 bytes or less than 8 bytes, respectively.
+/// This function implements the conversion from int32 and int64 arrays to decimal arrays.
+template <typename ParquetIntegerType,
+          typename = typename std::enable_if<
+              std::is_same<ParquetIntegerType, Int32Type>::value ||
+              std::is_same<ParquetIntegerType, Int64Type>::value>::type>
+static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
+                                     const std::shared_ptr<::arrow::DataType>& type,
+                                     std::shared_ptr<Array>* out) {
+  DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
+
+  const int64_t length = reader->values_written();
+
+  using ElementType = typename ParquetIntegerType::c_type;
+  static_assert(std::is_same<ElementType, int32_t>::value ||
+                    std::is_same<ElementType, int64_t>::value,
+                "ElementType must be int32_t or int64_t");
+
+  const auto values = reinterpret_cast<const ElementType*>(reader->values());
+
+  const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
+  const int64_t type_length = decimal_type.byte_width();
+
+  std::shared_ptr<Buffer> data;
+  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
+  uint8_t* out_ptr = data->mutable_data();
+
+  using ::arrow::BitUtil::FromLittleEndian;
+
+  for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
+    // sign/zero extend int32_t values, otherwise a no-op
+    const auto value = static_cast<int64_t>(values[i]);
+
+    auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
+
+    // No-op on little endian machines, byteswap on big endian
+    out_ptr_view[0] = FromLittleEndian(static_cast<uint64_t>(value));
+
+    // no need to byteswap here because we're sign/zero extending exactly 8 bytes
+    out_ptr_view[1] = static_cast<uint64_t>(value < 0 ? -1 : 0);
+  }
+
+  if (reader->nullable_values()) {
+    std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+    *out = std::make_shared<::arrow::Decimal128Array>(type, length, data, is_valid,
+                                                      reader->null_count());
+  } else {
+    *out = std::make_shared<::arrow::Decimal128Array>(type, length, data);
+  }
+  return Status::OK();
+}
+
+template <>
+struct TransferFunctor<::arrow::Decimal128Type, Int32Type> {
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type,
+                    std::shared_ptr<Array>* out) {
+    return DecimalIntegerTransfer<Int32Type>(reader, pool, type, out);
+  }
+};
+
+template <>
+struct TransferFunctor<::arrow::Decimal128Type, Int64Type> {
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type,
+                    std::shared_ptr<Array>* out) {
+    return DecimalIntegerTransfer<Int64Type>(reader, pool, type, out);
+  }
+};
+
 #define TRANSFER_DATA(ArrowType, ParquetType)                            \
   TransferFunctor<ArrowType, ParquetType> func;                          \
   RETURN_NOT_OK(func(record_reader_.get(), pool_, field_->type(), out)); \
@@ -932,6 +1170,22 @@
     TRANSFER_CASE(DATE32, ::arrow::Date32Type, Int32Type)
     TRANSFER_CASE(DATE64, ::arrow::Date64Type, Int32Type)
     TRANSFER_CASE(FIXED_SIZE_BINARY, ::arrow::FixedSizeBinaryType, FLBAType)
+    case ::arrow::Type::DECIMAL: {
+      switch (descr_->physical_type()) {
+        case ::parquet::Type::INT32: {
+          TRANSFER_DATA(::arrow::Decimal128Type, Int32Type);
+        } break;
+        case ::parquet::Type::INT64: {
+          TRANSFER_DATA(::arrow::Decimal128Type, Int64Type);
+        } break;
+        case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
+          TRANSFER_DATA(::arrow::Decimal128Type, FLBAType);
+        } break;
+        default:
+          return Status::Invalid(
+              "Physical type for decimal must be int32, int64, or fixed length binary");
+      }
+    } break;
     case ::arrow::Type::TIMESTAMP: {
       ::arrow::TimestampType* timestamp_type =
           static_cast<::arrow::TimestampType*>(field_->type().get());
@@ -946,8 +1200,7 @@
         default:
           return Status::NotImplemented("TimeUnit not supported");
       }
-      break;
-    }
+    } break;
       TRANSFER_CASE(TIME32, ::arrow::Time32Type, Int32Type)
       TRANSFER_CASE(TIME64, ::arrow::Time64Type, Int64Type)
     default:
diff --git a/src/parquet/arrow/record_reader.cc b/src/parquet/arrow/record_reader.cc
index 7275d2f..6405ee7 100644
--- a/src/parquet/arrow/record_reader.cc
+++ b/src/parquet/arrow/record_reader.cc
@@ -83,7 +83,7 @@
     Reset();
   }
 
-  virtual ~RecordReaderImpl() {}
+  virtual ~RecordReaderImpl() = default;
 
   virtual int64_t ReadRecords(int64_t num_records) = 0;
 
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index e16a1af..41da9fb 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -25,6 +25,7 @@
 #include "parquet/util/schema-util.h"
 
 #include "arrow/api.h"
+#include "arrow/util/logging.h"
 
 using arrow::Field;
 using arrow::Status;
@@ -49,10 +50,9 @@
 const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO);
 const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO);
 
-TypePtr MakeDecimalType(const PrimitiveNode& node) {
-  int precision = node.decimal_metadata().precision;
-  int scale = node.decimal_metadata().scale;
-  return std::make_shared<::arrow::DecimalType>(precision, scale);
+TypePtr MakeDecimal128Type(const PrimitiveNode& node) {
+  const auto& metadata = node.decimal_metadata();
+  return ::arrow::decimal(metadata.precision, metadata.scale);
 }
 
 static Status FromByteArray(const PrimitiveNode& node, TypePtr* out) {
@@ -61,7 +61,7 @@
       *out = ::arrow::utf8();
       break;
     case LogicalType::DECIMAL:
-      *out = MakeDecimalType(node);
+      *out = MakeDecimal128Type(node);
       break;
     default:
       // BINARY
@@ -77,7 +77,7 @@
       *out = ::arrow::fixed_size_binary(node.type_length());
       break;
     case LogicalType::DECIMAL:
-      *out = MakeDecimalType(node);
+      *out = MakeDecimal128Type(node);
       break;
     default:
       std::stringstream ss;
@@ -120,7 +120,7 @@
       *out = ::arrow::time32(::arrow::TimeUnit::MILLI);
       break;
     case LogicalType::DECIMAL:
-      *out = MakeDecimalType(node);
+      *out = MakeDecimal128Type(node);
       break;
     default:
       std::stringstream ss;
@@ -144,7 +144,7 @@
       *out = ::arrow::uint64();
       break;
     case LogicalType::DECIMAL:
-      *out = MakeDecimalType(node);
+      *out = MakeDecimal128Type(node);
       break;
     case LogicalType::TIMESTAMP_MILLIS:
       *out = TIMESTAMP_MS;
@@ -473,7 +473,10 @@
   ParquetType::type type;
   Repetition::type repetition =
       field->nullable() ? Repetition::OPTIONAL : Repetition::REQUIRED;
+
   int length = -1;
+  int precision = -1;
+  int scale = -1;
 
   switch (field->type()->id()) {
     case ArrowType::NA:
@@ -532,9 +535,18 @@
       break;
     case ArrowType::FIXED_SIZE_BINARY: {
       type = ParquetType::FIXED_LEN_BYTE_ARRAY;
-      auto fixed_size_binary_type =
-          static_cast<::arrow::FixedSizeBinaryType*>(field->type().get());
-      length = fixed_size_binary_type->byte_width();
+      const auto& fixed_size_binary_type =
+          static_cast<const ::arrow::FixedSizeBinaryType&>(*field->type());
+      length = fixed_size_binary_type.byte_width();
+    } break;
+    case ArrowType::DECIMAL: {
+      type = ParquetType::FIXED_LEN_BYTE_ARRAY;
+      logical_type = LogicalType::DECIMAL;
+      const auto& decimal_type =
+          static_cast<const ::arrow::Decimal128Type&>(*field->type());
+      precision = decimal_type.precision();
+      scale = decimal_type.scale();
+      length = DecimalSize(precision);
     } break;
     case ArrowType::DATE32:
       type = ParquetType::INT32;
@@ -565,12 +577,12 @@
       auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type());
       return StructToNode(struct_type, field->name(), field->nullable(), properties,
                           arrow_properties, out);
-    } break;
+    }
     case ArrowType::LIST: {
       auto list_type = std::static_pointer_cast<::arrow::ListType>(field->type());
       return ListToNode(list_type, field->name(), field->nullable(), properties,
                         arrow_properties, out);
-    } break;
+    }
     case ArrowType::DICTIONARY: {
       // Parquet has no Dictionary type, dictionary-encoded is handled on
       // the encoding, not the schema level.
@@ -582,14 +594,15 @@
       return FieldToNode(unpacked_field, properties, arrow_properties, out);
     }
     default: {
-      // TODO: DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR
+      // TODO: DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL_TEXT, VARCHAR
       std::stringstream ss;
       ss << "Unhandled type for Arrow to Parquet schema conversion: ";
       ss << field->type()->ToString();
       return Status::NotImplemented(ss.str());
     }
   }
-  *out = PrimitiveNode::Make(field->name(), repetition, type, logical_type, length);
+  *out = PrimitiveNode::Make(field->name(), repetition, type, logical_type, length,
+                             precision, scale);
   return Status::OK();
 }
 
@@ -617,5 +630,77 @@
                          out);
 }
 
+/// \brief Compute the number of bytes required to represent a decimal of a
+/// given precision. Taken from the Apache Impala codebase. The comments next
+/// to the return values are the maximum value that can be represented in 2's
+/// complement with the returned number of bytes.
+int32_t DecimalSize(int32_t precision) {
+  DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got "
+                          << precision;
+  DCHECK_LE(precision, 38) << "decimal precision must be less than or equal to 38, got "
+                           << precision;
+
+  switch (precision) {
+    case 1:
+    case 2:
+      return 1;  // 127
+    case 3:
+    case 4:
+      return 2;  // 32,767
+    case 5:
+    case 6:
+      return 3;  // 8,388,607
+    case 7:
+    case 8:
+    case 9:
+      return 4;  // 2,147,483,427
+    case 10:
+    case 11:
+      return 5;  // 549,755,813,887
+    case 12:
+    case 13:
+    case 14:
+      return 6;  // 140,737,488,355,327
+    case 15:
+    case 16:
+      return 7;  // 36,028,797,018,963,967
+    case 17:
+    case 18:
+      return 8;  // 9,223,372,036,854,775,807
+    case 19:
+    case 20:
+    case 21:
+      return 9;  // 2,361,183,241,434,822,606,847
+    case 22:
+    case 23:
+      return 10;  // 604,462,909,807,314,587,353,087
+    case 24:
+    case 25:
+    case 26:
+      return 11;  // 154,742,504,910,672,534,362,390,527
+    case 27:
+    case 28:
+      return 12;  // 39,614,081,257,132,168,796,771,975,167
+    case 29:
+    case 30:
+    case 31:
+      return 13;  // 10,141,204,801,825,835,211,973,625,643,007
+    case 32:
+    case 33:
+      return 14;  // 2,596,148,429,267,413,814,265,248,164,610,047
+    case 34:
+    case 35:
+      return 15;  // 664,613,997,892,457,936,451,903,530,140,172,287
+    case 36:
+    case 37:
+    case 38:
+      return 16;  // 170,141,183,460,469,231,731,687,303,715,884,105,727
+    default:
+      DCHECK(false);
+      break;
+  }
+  return -1;
+}
+
 }  // namespace arrow
 }  // namespace parquet
diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h
index de153eb..3b212da 100644
--- a/src/parquet/arrow/schema.h
+++ b/src/parquet/arrow/schema.h
@@ -85,6 +85,8 @@
                                                const WriterProperties& properties,
                                                std::shared_ptr<SchemaDescriptor>* out);
 
+int32_t DecimalSize(int32_t precision);
+
 }  // namespace arrow
 
 }  // namespace parquet
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index 954a84f..8611a30 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -15,12 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <limits>
 #include <string>
 #include <vector>
 
 #include "arrow/api.h"
 #include "arrow/test-util.h"
 #include "arrow/type_traits.h"
+#include "arrow/util/decimal.h"
 
 namespace parquet {
 namespace arrow {
@@ -28,6 +30,16 @@
 using ::arrow::Array;
 using ::arrow::Status;
 
+template <int32_t PRECISION>
+struct DecimalWithPrecisionAndScale {
+  static_assert(PRECISION >= 1 && PRECISION <= 38, "Invalid precision value");
+
+  using type = ::arrow::Decimal128Type;
+  static constexpr ::arrow::Type::type type_id = ::arrow::Decimal128Type::type_id;
+  static constexpr int32_t precision = PRECISION;
+  static constexpr int32_t scale = PRECISION - 1;
+};
+
 template <typename ArrowType>
 using is_arrow_float = std::is_floating_point<typename ArrowType::c_type>;
 
@@ -52,8 +64,10 @@
 template <class ArrowType>
 typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NonNullArray(
     size_t size, std::shared_ptr<Array>* out) {
-  std::vector<typename ArrowType::c_type> values;
-  ::arrow::test::random_real<typename ArrowType::c_type>(size, 0, 0, 1, &values);
+  using c_type = typename ArrowType::c_type;
+  std::vector<c_type> values;
+  ::arrow::test::random_real(size, 0, static_cast<c_type>(0), static_cast<c_type>(1),
+                             &values);
   ::arrow::NumericBuilder<ArrowType> builder;
   RETURN_NOT_OK(builder.Append(values.data(), values.size()));
   return builder.Finish(out);
@@ -64,7 +78,7 @@
     is_arrow_int<ArrowType>::value && !is_arrow_date<ArrowType>::value, Status>::type
 NonNullArray(size_t size, std::shared_ptr<Array>* out) {
   std::vector<typename ArrowType::c_type> values;
-  ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+  ::arrow::test::randint(size, 0, 64, &values);
 
   // Passing data type so this will work with TimestampType too
   ::arrow::NumericBuilder<ArrowType> builder(std::make_shared<ArrowType>(),
@@ -77,7 +91,7 @@
 typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NonNullArray(
     size_t size, std::shared_ptr<Array>* out) {
   std::vector<typename ArrowType::c_type> values;
-  ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+  ::arrow::test::randint(size, 0, 64, &values);
   for (size_t i = 0; i < size; i++) {
     values[i] *= 86400000;
   }
@@ -114,11 +128,54 @@
   return builder.Finish(out);
 }
 
+static inline void random_decimals(int64_t n, uint32_t seed, int32_t precision,
+                                   uint8_t* out) {
+  std::mt19937 gen(seed);
+  std::uniform_int_distribution<uint32_t> d(0, std::numeric_limits<uint8_t>::max());
+  const int32_t required_bytes = DecimalSize(precision);
+  constexpr int32_t byte_width = 16;
+  std::fill(out, out + byte_width * n, '\0');
+
+  for (int64_t i = 0; i < n; ++i, out += byte_width) {
+    std::generate(out, out + required_bytes,
+                  [&d, &gen] { return static_cast<uint8_t>(d(gen)); });
+
+    // sign extend if the sign bit is set for the last byte generated
+    // 0b10000000 == 0x80 == 128
+    if ((out[required_bytes - 1] & '\x80') != 0) {
+      std::fill(out + required_bytes, out + byte_width, '\xFF');
+    }
+  }
+}
+
+template <typename ArrowType, int32_t precision = ArrowType::precision>
+typename std::enable_if<
+    std::is_same<ArrowType, DecimalWithPrecisionAndScale<precision>>::value, Status>::type
+NonNullArray(size_t size, std::shared_ptr<Array>* out) {
+  constexpr int32_t kDecimalPrecision = precision;
+  constexpr int32_t kDecimalScale = DecimalWithPrecisionAndScale<precision>::scale;
+
+  const auto type = ::arrow::decimal(kDecimalPrecision, kDecimalScale);
+  ::arrow::Decimal128Builder builder(type);
+  const int32_t byte_width =
+      static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
+
+  constexpr int32_t seed = 0;
+
+  std::shared_ptr<Buffer> out_buf;
+  RETURN_NOT_OK(::arrow::AllocateBuffer(::arrow::default_memory_pool(), size * byte_width,
+                                        &out_buf));
+  random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data());
+
+  RETURN_NOT_OK(builder.Append(out_buf->data(), size));
+  return builder.Finish(out);
+}
+
 template <class ArrowType>
 typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NonNullArray(
     size_t size, std::shared_ptr<Array>* out) {
   std::vector<uint8_t> values;
-  ::arrow::test::randint<uint8_t>(size, 0, 1, &values);
+  ::arrow::test::randint(size, 0, 1, &values);
   ::arrow::BooleanBuilder builder;
   RETURN_NOT_OK(builder.Append(values.data(), values.size()));
   return builder.Finish(out);
@@ -128,9 +185,10 @@
 template <typename ArrowType>
 typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NullableArray(
     size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) {
-  std::vector<typename ArrowType::c_type> values;
-  ::arrow::test::random_real<typename ArrowType::c_type>(size, seed, -1e10, 1e10,
-                                                         &values);
+  using c_type = typename ArrowType::c_type;
+  std::vector<c_type> values;
+  ::arrow::test::random_real(size, seed, static_cast<c_type>(-1e10),
+                             static_cast<c_type>(1e10), &values);
   std::vector<uint8_t> valid_bytes(size, 1);
 
   for (size_t i = 0; i < num_nulls; i++) {
@@ -151,7 +209,7 @@
 
   // Seed is random in Arrow right now
   (void)seed;
-  ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+  ::arrow::test::randint(size, 0, 64, &values);
   std::vector<uint8_t> valid_bytes(size, 1);
 
   for (size_t i = 0; i < num_nulls; i++) {
@@ -172,7 +230,7 @@
 
   // Seed is random in Arrow right now
   (void)seed;
-  ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+  ::arrow::test::randint(size, 0, 64, &values);
   for (size_t i = 0; i < size; i++) {
     values[i] *= 86400000;
   }
@@ -246,6 +304,34 @@
   return builder.Finish(out);
 }
 
+template <typename ArrowType, int32_t precision = ArrowType::precision>
+typename std::enable_if<
+    std::is_same<ArrowType, DecimalWithPrecisionAndScale<precision>>::value, Status>::type
+NullableArray(size_t size, size_t num_nulls, uint32_t seed,
+              std::shared_ptr<::arrow::Array>* out) {
+  std::vector<uint8_t> valid_bytes(size, '\1');
+
+  for (size_t i = 0; i < num_nulls; ++i) {
+    valid_bytes[i * 2] = '\0';
+  }
+
+  constexpr int32_t kDecimalPrecision = precision;
+  constexpr int32_t kDecimalScale = DecimalWithPrecisionAndScale<precision>::scale;
+  const auto type = ::arrow::decimal(kDecimalPrecision, kDecimalScale);
+  const int32_t byte_width =
+      static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
+
+  std::shared_ptr<::arrow::Buffer> out_buf;
+  RETURN_NOT_OK(::arrow::AllocateBuffer(::arrow::default_memory_pool(), size * byte_width,
+                                        &out_buf));
+
+  random_decimals(size, seed, precision, out_buf->mutable_data());
+
+  ::arrow::Decimal128Builder builder(type);
+  RETURN_NOT_OK(builder.Append(out_buf->data(), size, valid_bytes.data()));
+  return builder.Finish(out);
+}
+
 // This helper function only supports (size/2) nulls yet.
 template <class ArrowType>
 typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableArray(
@@ -255,7 +341,7 @@
   // Seed is random in Arrow right now
   (void)seed;
 
-  ::arrow::test::randint<uint8_t>(size, 0, 1, &values);
+  ::arrow::test::randint(size, 0, 1, &values);
   std::vector<uint8_t> valid_bytes(size, 1);
 
   for (size_t i = 0; i < num_nulls; i++) {
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index b53c1ca..1f3fc7e 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -32,6 +32,7 @@
 using arrow::Array;
 using arrow::BinaryArray;
 using arrow::FixedSizeBinaryArray;
+using arrow::Decimal128Array;
 using arrow::BooleanArray;
 using arrow::Int16Array;
 using arrow::Int16Builder;
@@ -104,7 +105,6 @@
 
   NOT_IMPLEMENTED_VISIT(Struct)
   NOT_IMPLEMENTED_VISIT(Union)
-  NOT_IMPLEMENTED_VISIT(Decimal)
   NOT_IMPLEMENTED_VISIT(Dictionary)
   NOT_IMPLEMENTED_VISIT(Interval)
 
@@ -743,8 +743,6 @@
       buffer_ptr[i] =
           ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + value_offset[i]);
     }
-    PARQUET_CATCH_NOT_OK(
-        writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
   } else {
     int buffer_idx = 0;
     for (int64_t i = 0; i < data->length(); i++) {
@@ -753,9 +751,9 @@
             ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + value_offset[i]);
       }
     }
-    PARQUET_CATCH_NOT_OK(
-        writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
   }
+  PARQUET_CATCH_NOT_OK(
+      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
   PARQUET_CATCH_NOT_OK(writer->Close());
   return Status::OK();
 }
@@ -765,29 +763,82 @@
     ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
     const int16_t* def_levels, const int16_t* rep_levels) {
   RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(FLBA), false));
-  auto data = static_cast<const FixedSizeBinaryArray*>(array.get());
+  const auto& data = static_cast<const FixedSizeBinaryArray&>(*array);
+  const int64_t length = data.length();
+
   auto buffer_ptr = reinterpret_cast<FLBA*>(data_buffer_.mutable_data());
 
   auto writer = reinterpret_cast<TypedColumnWriter<FLBAType>*>(column_writer);
 
-  if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) {
+  if (writer->descr()->schema_node()->is_required() || data.null_count() == 0) {
     // no nulls, just dump the data
     // todo(advancedxy): use a writeBatch to avoid this step
-    for (int64_t i = 0; i < data->length(); i++) {
-      buffer_ptr[i] = FixedLenByteArray(data->GetValue(i));
+    for (int64_t i = 0; i < length; i++) {
+      buffer_ptr[i] = FixedLenByteArray(data.GetValue(i));
     }
-    PARQUET_CATCH_NOT_OK(
-        writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
   } else {
     int buffer_idx = 0;
-    for (int64_t i = 0; i < data->length(); i++) {
-      if (!data->IsNull(i)) {
-        buffer_ptr[buffer_idx++] = FixedLenByteArray(data->GetValue(i));
+    for (int64_t i = 0; i < length; i++) {
+      if (!data.IsNull(i)) {
+        buffer_ptr[buffer_idx++] = FixedLenByteArray(data.GetValue(i));
       }
     }
-    PARQUET_CATCH_NOT_OK(
-        writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
   }
+  PARQUET_CATCH_NOT_OK(
+      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
+  PARQUET_CATCH_NOT_OK(writer->Close());
+  return Status::OK();
+}
+
+template <>
+Status FileWriter::Impl::TypedWriteBatch<FLBAType, ::arrow::Decimal128Type>(
+    ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
+    const int16_t* def_levels, const int16_t* rep_levels) {
+  const auto& data = static_cast<const Decimal128Array&>(*array);
+  const int64_t length = data.length();
+
+  // TODO(phillipc): This is potentially very wasteful if we have a lot of nulls
+  std::vector<uint64_t> big_endian_values(static_cast<size_t>(length) * 2);
+
+  RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(FLBA), false));
+  auto buffer_ptr = reinterpret_cast<FLBA*>(data_buffer_.mutable_data());
+
+  auto writer = reinterpret_cast<TypedColumnWriter<FLBAType>*>(column_writer);
+
+  const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*data.type());
+  const int32_t offset =
+      decimal_type.byte_width() - DecimalSize(decimal_type.precision());
+
+  const bool does_not_have_nulls =
+      writer->descr()->schema_node()->is_required() || data.null_count() == 0;
+
+  // TODO(phillipc): Look into whether our compilers will perform loop unswitching so we
+  // don't have to keep writing two loops to handle the case where we know there are no
+  // nulls
+  if (does_not_have_nulls) {
+    // no nulls, just dump the data
+    // todo(advancedxy): use a writeBatch to avoid this step
+    for (int64_t i = 0, j = 0; i < length; ++i, j += 2) {
+      auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
+      big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
+      big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
+      buffer_ptr[i] = FixedLenByteArray(
+          reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
+    }
+  } else {
+    for (int64_t i = 0, buffer_idx = 0, j = 0; i < length; ++i) {
+      if (!data.IsNull(i)) {
+        auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
+        big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
+        big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
+        buffer_ptr[buffer_idx++] = FixedLenByteArray(
+            reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
+        j += 2;
+      }
+    }
+  }
+  PARQUET_CATCH_NOT_OK(
+      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
   PARQUET_CATCH_NOT_OK(writer->Close());
   return Status::OK();
 }
@@ -896,6 +947,7 @@
       WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType)
       WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType)
       WRITE_BATCH_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType)
+      WRITE_BATCH_CASE(DECIMAL, Decimal128Type, FLBAType)
       WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type)
       WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type)
       WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type)
diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h
index be38752..3284aca 100644
--- a/src/parquet/encoding-internal.h
+++ b/src/parquet/encoding-internal.h
@@ -420,11 +420,9 @@
 
   PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false));
   uint8_t* bytes_data = byte_array_data_->mutable_data();
-  int offset = 0;
-  for (int i = 0; i < num_dictionary_values; ++i) {
+  for (int32_t i = 0, offset = 0; i < num_dictionary_values; ++i, offset += fixed_len) {
     memcpy(bytes_data + offset, dictionary_[i].ptr, fixed_len);
     dictionary_[i].ptr = bytes_data + offset;
-    offset += fixed_len;
   }
 }
 
@@ -597,7 +595,7 @@
 template <>
 inline int DictEncoder<ByteArrayType>::Hash(const ByteArray& value) const {
   if (value.len > 0) {
-    DCHECK(nullptr != value.ptr) << "Value ptr cannot be NULL";
+    DCHECK_NE(nullptr, value.ptr) << "Value ptr cannot be NULL";
   }
   return HashUtil::Hash(value.ptr, value.len, 0);
 }
@@ -605,7 +603,7 @@
 template <>
 inline int DictEncoder<FLBAType>::Hash(const FixedLenByteArray& value) const {
   if (type_length_ > 0) {
-    DCHECK(nullptr != value.ptr) << "Value ptr cannot be NULL";
+    DCHECK_NE(nullptr, value.ptr) << "Value ptr cannot be NULL";
   }
   return HashUtil::Hash(value.ptr, type_length_, 0);
 }
@@ -923,7 +921,8 @@
       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
       : Decoder<ByteArrayType>(descr, Encoding::DELTA_BYTE_ARRAY),
         prefix_len_decoder_(nullptr, pool),
-        suffix_decoder_(nullptr, pool) {}
+        suffix_decoder_(nullptr, pool),
+        last_value_(0, nullptr) {}
 
   virtual void SetData(int num_values, const uint8_t* data, int len) {
     num_values_ = num_values;
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index 9b9bde9..4ec48a4 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -45,9 +45,9 @@
     : contents_(std::move(contents)) {}
 
 std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
-  DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
-                                        << metadata()->num_columns()
-                                        << "columns, requested column: " << i;
+  DCHECK(i < metadata()->num_columns())
+      << "The RowGroup only has " << metadata()->num_columns()
+      << "columns, requested column: " << i;
   const ColumnDescriptor* descr = metadata()->schema()->Column(i);
 
   std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
@@ -57,9 +57,9 @@
 }
 
 std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
-  DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
-                                        << metadata()->num_columns()
-                                        << "columns, requested column: " << i;
+  DCHECK(i < metadata()->num_columns())
+      << "The RowGroup only has " << metadata()->num_columns()
+      << "columns, requested column: " << i;
   return contents_->GetColumnPageReader(i);
 }
 
@@ -127,9 +127,9 @@
 }
 
 std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
-  DCHECK(i < metadata()->num_row_groups()) << "The file only has "
-                                           << metadata()->num_row_groups()
-                                           << "row groups, requested reader for: " << i;
+  DCHECK(i < metadata()->num_row_groups())
+      << "The file only has " << metadata()->num_row_groups()
+      << "row groups, requested reader for: " << i;
   return contents_->GetRowGroup(i);
 }
 
diff --git a/src/parquet/types.h b/src/parquet/types.h
index af3a58f..53b33d5 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -21,6 +21,7 @@
 #include <algorithm>
 #include <cstdint>
 #include <cstring>
+#include <iterator>
 #include <sstream>
 #include <string>
 
@@ -136,79 +137,63 @@
   ByteArray(uint32_t len, const uint8_t* ptr) : len(len), ptr(ptr) {}
   uint32_t len;
   const uint8_t* ptr;
-
-  bool operator==(const ByteArray& other) const {
-    return this->len == other.len && 0 == memcmp(this->ptr, other.ptr, this->len);
-  }
-
-  bool operator!=(const ByteArray& other) const {
-    return this->len != other.len || 0 != memcmp(this->ptr, other.ptr, this->len);
-  }
 };
 
+inline bool operator==(const ByteArray& left, const ByteArray& right) {
+  return left.len == right.len && std::equal(left.ptr, left.ptr + left.len, right.ptr);
+}
+
+inline bool operator!=(const ByteArray& left, const ByteArray& right) {
+  return !(left == right);
+}
+
 struct FixedLenByteArray {
   FixedLenByteArray() : ptr(nullptr) {}
   explicit FixedLenByteArray(const uint8_t* ptr) : ptr(ptr) {}
   const uint8_t* ptr;
 };
 
-typedef FixedLenByteArray FLBA;
+using FLBA = FixedLenByteArray;
 
-MANUALLY_ALIGNED_STRUCT(1) Int96 {
-  uint32_t value[3];
-
-  bool operator==(const Int96& other) const {
-    return 0 == memcmp(this->value, other.value, 3 * sizeof(uint32_t));
-  }
-
-  bool operator!=(const Int96& other) const { return !(*this == other); }
-};
+MANUALLY_ALIGNED_STRUCT(1) Int96 { uint32_t value[3]; };
 STRUCT_END(Int96, 12);
 
+inline bool operator==(const Int96& left, const Int96& right) {
+  return std::equal(left.value, left.value + 3, right.value);
+}
+
+inline bool operator!=(const Int96& left, const Int96& right) { return !(left == right); }
+
 static inline std::string ByteArrayToString(const ByteArray& a) {
   return std::string(reinterpret_cast<const char*>(a.ptr), a.len);
 }
 
 static inline std::string Int96ToString(const Int96& a) {
-  std::stringstream result;
-  for (int i = 0; i < 3; i++) {
-    result << a.value[i] << " ";
-  }
+  std::ostringstream result;
+  std::copy(a.value, a.value + 3, std::ostream_iterator<uint32_t>(result, " "));
   return result.str();
 }
 
 static inline std::string FixedLenByteArrayToString(const FixedLenByteArray& a, int len) {
-  const uint8_t* bytes = reinterpret_cast<const uint8_t*>(a.ptr);
-  std::stringstream result;
-  for (int i = 0; i < len; i++) {
-    result << (uint32_t)bytes[i] << " ";
-  }
+  std::ostringstream result;
+  std::copy(a.ptr, a.ptr + len, std::ostream_iterator<uint32_t>(result, " "));
   return result.str();
 }
 
-static inline int ByteCompare(const ByteArray& x1, const ByteArray& x2) {
-  uint32_t len = std::min(x1.len, x2.len);
-  int cmp = memcmp(x1.ptr, x2.ptr, len);
-  if (cmp != 0) return cmp;
-  if (len < x1.len) return 1;
-  if (len < x2.len) return -1;
-  return 0;
-}
-
-template <int TYPE>
+template <Type::type TYPE>
 struct type_traits {};
 
 template <>
 struct type_traits<Type::BOOLEAN> {
-  typedef bool value_type;
-  static constexpr int value_byte_size = 1;
+  using value_type = bool;
 
+  static constexpr int value_byte_size = 1;
   static constexpr const char* printf_code = "d";
 };
 
 template <>
 struct type_traits<Type::INT32> {
-  typedef int32_t value_type;
+  using value_type = int32_t;
 
   static constexpr int value_byte_size = 4;
   static constexpr const char* printf_code = "d";
@@ -216,7 +201,7 @@
 
 template <>
 struct type_traits<Type::INT64> {
-  typedef int64_t value_type;
+  using value_type = int64_t;
 
   static constexpr int value_byte_size = 8;
   static constexpr const char* printf_code = "ld";
@@ -224,7 +209,7 @@
 
 template <>
 struct type_traits<Type::INT96> {
-  typedef Int96 value_type;
+  using value_type = Int96;
 
   static constexpr int value_byte_size = 12;
   static constexpr const char* printf_code = "s";
@@ -232,7 +217,7 @@
 
 template <>
 struct type_traits<Type::FLOAT> {
-  typedef float value_type;
+  using value_type = float;
 
   static constexpr int value_byte_size = 4;
   static constexpr const char* printf_code = "f";
@@ -240,7 +225,7 @@
 
 template <>
 struct type_traits<Type::DOUBLE> {
-  typedef double value_type;
+  using value_type = double;
 
   static constexpr int value_byte_size = 8;
   static constexpr const char* printf_code = "lf";
@@ -248,7 +233,7 @@
 
 template <>
 struct type_traits<Type::BYTE_ARRAY> {
-  typedef ByteArray value_type;
+  using value_type = ByteArray;
 
   static constexpr int value_byte_size = sizeof(ByteArray);
   static constexpr const char* printf_code = "s";
@@ -256,7 +241,7 @@
 
 template <>
 struct type_traits<Type::FIXED_LEN_BYTE_ARRAY> {
-  typedef FixedLenByteArray value_type;
+  using value_type = FixedLenByteArray;
 
   static constexpr int value_byte_size = sizeof(FixedLenByteArray);
   static constexpr const char* printf_code = "s";
@@ -264,18 +249,18 @@
 
 template <Type::type TYPE>
 struct DataType {
+  using c_type = typename type_traits<TYPE>::value_type;
   static constexpr Type::type type_num = TYPE;
-  typedef typename type_traits<TYPE>::value_type c_type;
 };
 
-typedef DataType<Type::BOOLEAN> BooleanType;
-typedef DataType<Type::INT32> Int32Type;
-typedef DataType<Type::INT64> Int64Type;
-typedef DataType<Type::INT96> Int96Type;
-typedef DataType<Type::FLOAT> FloatType;
-typedef DataType<Type::DOUBLE> DoubleType;
-typedef DataType<Type::BYTE_ARRAY> ByteArrayType;
-typedef DataType<Type::FIXED_LEN_BYTE_ARRAY> FLBAType;
+using BooleanType = DataType<Type::BOOLEAN>;
+using Int32Type = DataType<Type::INT32>;
+using Int64Type = DataType<Type::INT64>;
+using Int96Type = DataType<Type::INT96>;
+using FloatType = DataType<Type::FLOAT>;
+using DoubleType = DataType<Type::DOUBLE>;
+using ByteArrayType = DataType<Type::BYTE_ARRAY>;
+using FLBAType = DataType<Type::FIXED_LEN_BYTE_ARRAY>;
 
 template <typename Type>
 inline std::string format_fwf(int width) {