supports forceShuffledHashJoin.

optimize shuffle read performance.

optimize small batch coalescing.

improve hash join by skipping null values.

improve metrics and logging.

fix decimal arithmetic operator data type error in spark333.
diff --git a/Cargo.lock b/Cargo.lock
index af72ba2..f85c460 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -872,6 +872,7 @@
  "once_cell",
  "paste",
  "postcard",
+ "radsort",
  "rand",
  "slimmer_box",
  "tempfile",
@@ -952,6 +953,7 @@
  "smallvec",
  "tempfile",
  "tokio",
+ "unchecked-index",
  "uuid",
  "zstd",
 ]
@@ -2094,6 +2096,12 @@
 checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09"
 
 [[package]]
+name = "radsort"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "17fd96390ed3feda12e1dfe2645ed587e0bea749e319333f104a33ff62f77a0b"
+
+[[package]]
 name = "rand"
 version = "0.8.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/native-engine/blaze-jni-bridge/src/jni_bridge.rs b/native-engine/blaze-jni-bridge/src/jni_bridge.rs
index c68ee9a..083b044 100644
--- a/native-engine/blaze-jni-bridge/src/jni_bridge.rs
+++ b/native-engine/blaze-jni-bridge/src/jni_bridge.rs
@@ -93,44 +93,32 @@
                         ex,
                         $crate::jni_bridge::JavaClasses::get()
                             .cJavaThrowable
-                            .method_getMessage,
+                            .method_toString,
                         $crate::jni_bridge::JavaClasses::get()
                             .cJavaThrowable
-                            .method_getMessage_ret
+                            .method_toString_ret
                             .clone(),
                         &[],
                     )
                     .unwrap()
                     .l()
                     .unwrap();
-                if !message_obj.is_null() {
-                    let message = $env
-                        .get_string(message_obj.into())
-                        .map(|s| String::from(s))
-                        .unwrap();
-                    Err(
-                        $crate::jni_bridge::datafusion::error::DataFusionError::External(
-                            format!(
-                                "Java exception thrown at {}:{}: {}",
-                                file!(),
-                                line!(),
-                                message
-                            )
-                            .into(),
-                        ),
-                    )
-                } else {
-                    Err(
-                        $crate::jni_bridge::datafusion::error::DataFusionError::External(
-                            format!(
-                                "Java exception thrown at {}:{}: (no message)",
-                                file!(),
-                                line!()
-                            )
-                            .into(),
-                        ),
-                    )
-                }
+                let message = $env
+                    .get_string(message_obj.into())
+                    .map(|s| String::from(s))
+                    .unwrap();
+
+                Err(
+                    $crate::jni_bridge::datafusion::error::DataFusionError::External(
+                        format!(
+                            "Java exception thrown at {}:{}: {}",
+                            file!(),
+                            line!(),
+                            message
+                        )
+                        .into(),
+                    ),
+                )
             }
             Err(err) => Err(
                 $crate::jni_bridge::datafusion::error::DataFusionError::External(
@@ -200,6 +188,20 @@
 }
 
 #[macro_export]
+macro_rules! jni_get_direct_buffer {
+    ($value:expr) => {{
+        let pos = jni_call!(JavaBuffer($value).position() -> i32)? as usize;
+        let remaining = jni_call!(JavaBuffer($value).remaining() -> i32)? as usize;
+        $crate::jni_bridge::THREAD_JNIENV.with(|env| {
+            $crate::jni_map_error_with_env!(env, env.get_direct_buffer_address($value.into()))
+                .map(|s| unsafe {
+                    std::slice::from_raw_parts(s.add(pos), remaining)
+                })
+        })
+    }};
+}
+
+#[macro_export]
 macro_rules! jni_get_string {
     ($value:expr) => {{
         $crate::jni_bridge::THREAD_JNIENV.with(|env| {
@@ -219,6 +221,20 @@
 }
 
 #[macro_export]
+macro_rules! jni_get_byte_array_region {
+    ($value:expr, $start:expr, $buf:expr) => {{
+        $crate::jni_bridge::THREAD_JNIENV.with(|env| {
+            $crate::jni_map_error_with_env!(
+                env,
+                env.get_byte_array_region($value, $start as i32, unsafe {
+                    std::mem::transmute::<_, &mut [i8]>($buf)
+                })
+            )
+        })
+    }};
+}
+
+#[macro_export]
 macro_rules! jni_call {
     ($clsname:ident($obj:expr).$method:ident($($args:expr),* $(,)?) -> JObject) => {{
         $crate::jni_bridge::THREAD_JNIENV.with(|env| {
@@ -404,6 +420,7 @@
     pub cBlazeCallNativeWrapper: BlazeCallNativeWrapper<'a>,
     pub cBlazeOnHeapSpillManager: BlazeOnHeapSpillManager<'a>,
     pub cBlazeNativeParquetSinkUtils: BlazeNativeParquetSinkUtils<'a>,
+    pub cBlazeBlockObject: BlazeBlockObject<'a>,
 }
 
 #[allow(clippy::non_send_fields_in_send_ty)]
@@ -469,6 +486,7 @@
                 cBlazeCallNativeWrapper: BlazeCallNativeWrapper::new(env).unwrap(),
                 cBlazeOnHeapSpillManager: BlazeOnHeapSpillManager::new(env).unwrap(),
                 cBlazeNativeParquetSinkUtils: BlazeNativeParquetSinkUtils::new(env).unwrap(),
+                cBlazeBlockObject: BlazeBlockObject::new(env).unwrap(),
             };
             log::info!("Initializing JavaClasses finished");
             java_classes
@@ -635,8 +653,8 @@
 #[allow(non_snake_case)]
 pub struct JavaThrowable<'a> {
     pub class: JClass<'a>,
-    pub method_getMessage: JMethodID,
-    pub method_getMessage_ret: ReturnType,
+    pub method_toString: JMethodID,
+    pub method_toString_ret: ReturnType,
 }
 impl<'a> JavaThrowable<'a> {
     pub const SIG_TYPE: &'static str = "java/lang/Throwable";
@@ -645,8 +663,8 @@
         let class = get_global_jclass(env, Self::SIG_TYPE)?;
         Ok(JavaThrowable {
             class,
-            method_getMessage: env.get_method_id(class, "getMessage", "()Ljava/lang/String;")?,
-            method_getMessage_ret: ReturnType::Object,
+            method_toString: env.get_method_id(class, "toString", "()Ljava/lang/String;")?,
+            method_toString_ret: ReturnType::Object,
         })
     }
 }
@@ -872,6 +890,14 @@
     pub method_hasRemaining_ret: ReturnType,
     pub method_position: JMethodID,
     pub method_position_ret: ReturnType,
+    pub method_remaining: JMethodID,
+    pub method_remaining_ret: ReturnType,
+    pub method_isDirect: JMethodID,
+    pub method_isDirect_ret: ReturnType,
+    pub method_hasArray: JMethodID,
+    pub method_hasArray_ret: ReturnType,
+    pub method_array: JMethodID,
+    pub method_array_ret: ReturnType,
 }
 impl<'a> JavaBuffer<'a> {
     pub const SIG_TYPE: &'static str = "java/nio/Buffer";
@@ -884,6 +910,14 @@
             method_hasRemaining_ret: ReturnType::Primitive(Primitive::Boolean),
             method_position: env.get_method_id(class, "position", "()I")?,
             method_position_ret: ReturnType::Primitive(Primitive::Int),
+            method_remaining: env.get_method_id(class, "remaining", "()I")?,
+            method_remaining_ret: ReturnType::Primitive(Primitive::Int),
+            method_isDirect: env.get_method_id(class, "isDirect", "()Z")?,
+            method_isDirect_ret: ReturnType::Primitive(Primitive::Boolean),
+            method_hasArray: env.get_method_id(class, "hasArray", "()Z")?,
+            method_hasArray_ret: ReturnType::Primitive(Primitive::Boolean),
+            method_array: env.get_method_id(class, "array", "()Ljava/lang/Object;")?,
+            method_array_ret: ReturnType::Object,
         })
     }
 }
@@ -1400,6 +1434,60 @@
     }
 }
 
+#[allow(non_snake_case)]
+pub struct BlazeBlockObject<'a> {
+    pub class: JClass<'a>,
+    pub method_hasFileSegment: JMethodID,
+    pub method_hasFileSegment_ret: ReturnType,
+    pub method_hasByteBuffer: JMethodID,
+    pub method_hasByteBuffer_ret: ReturnType,
+    pub method_getFilePath: JMethodID,
+    pub method_getFilePath_ret: ReturnType,
+    pub method_getFileOffset: JMethodID,
+    pub method_getFileOffset_ret: ReturnType,
+    pub method_getFileLength: JMethodID,
+    pub method_getFileLength_ret: ReturnType,
+    pub method_getByteBuffer: JMethodID,
+    pub method_getByteBuffer_ret: ReturnType,
+    pub method_getChannel: JMethodID,
+    pub method_getChannel_ret: ReturnType,
+}
+
+impl<'a> BlazeBlockObject<'a> {
+    pub const SIG_TYPE: &'static str = "org/apache/spark/sql/execution/blaze/shuffle/BlockObject";
+
+    pub fn new(env: &JNIEnv<'a>) -> JniResult<BlazeBlockObject<'a>> {
+        let class = get_global_jclass(env, Self::SIG_TYPE)?;
+        Ok(BlazeBlockObject {
+            class,
+            method_hasFileSegment: env.get_method_id(class, "hasFileSegment", "()Z").unwrap(),
+            method_hasFileSegment_ret: ReturnType::Primitive(Primitive::Boolean),
+            method_hasByteBuffer: env.get_method_id(class, "hasByteBuffer", "()Z").unwrap(),
+            method_hasByteBuffer_ret: ReturnType::Primitive(Primitive::Boolean),
+            method_getFilePath: env
+                .get_method_id(class, "getFilePath", "()Ljava/lang/String;")
+                .unwrap(),
+            method_getFilePath_ret: ReturnType::Object,
+            method_getFileOffset: env.get_method_id(class, "getFileOffset", "()J").unwrap(),
+            method_getFileOffset_ret: ReturnType::Primitive(Primitive::Long),
+            method_getFileLength: env.get_method_id(class, "getFileLength", "()J").unwrap(),
+            method_getFileLength_ret: ReturnType::Primitive(Primitive::Long),
+            method_getByteBuffer: env
+                .get_method_id(class, "getByteBuffer", "()Ljava/nio/ByteBuffer;")
+                .unwrap(),
+            method_getByteBuffer_ret: ReturnType::Object,
+            method_getChannel: env
+                .get_method_id(
+                    class,
+                    "getChannel",
+                    "()Ljava/nio/channels/ReadableByteChannel;",
+                )
+                .unwrap(),
+            method_getChannel_ret: ReturnType::Object,
+        })
+    }
+}
+
 fn get_global_jclass(env: &JNIEnv<'_>, cls: &str) -> JniResult<JClass<'static>> {
     let local_jclass = env.find_class(cls)?;
     Ok(get_global_ref_jobject(env, local_jclass.into())?.into())
diff --git a/native-engine/blaze-serde/src/from_proto.rs b/native-engine/blaze-serde/src/from_proto.rs
index 25fe90f..03d454f 100644
--- a/native-engine/blaze-serde/src/from_proto.rs
+++ b/native-engine/blaze-serde/src/from_proto.rs
@@ -70,7 +70,6 @@
     filter_exec::FilterExec,
     generate::{create_generator, create_udtf_generator},
     generate_exec::GenerateExec,
-    hash_join_exec::HashJoinExec,
     ipc_reader_exec::IpcReaderExec,
     ipc_writer_exec::IpcWriterExec,
     limit_exec::LimitExec,
@@ -206,7 +205,7 @@
                 let build_side =
                     protobuf::JoinSide::try_from(hash_join.build_side).expect("invalid BuildSide");
 
-                Ok(Arc::new(HashJoinExec::try_new(
+                Ok(Arc::new(BroadcastJoinExec::try_new(
                     schema,
                     left,
                     right,
@@ -217,6 +216,8 @@
                     build_side
                         .try_into()
                         .map_err(|_| proto_error("invalid BuildSide"))?,
+                    false,
+                    None,
                 )?))
             }
             PhysicalPlanType::SortMergeJoin(sort_merge_join) => {
@@ -408,6 +409,7 @@
                     broadcast_side
                         .try_into()
                         .map_err(|_| proto_error("invalid BroadcastSide"))?,
+                    true,
                     Some(cached_build_hash_map_id),
                 )?))
             }
diff --git a/native-engine/datafusion-ext-commons/Cargo.toml b/native-engine/datafusion-ext-commons/Cargo.toml
index dfe9963..7f8870b 100644
--- a/native-engine/datafusion-ext-commons/Cargo.toml
+++ b/native-engine/datafusion-ext-commons/Cargo.toml
@@ -25,6 +25,7 @@
 once_cell = "1.19.0"
 paste = "1.0.15"
 postcard = { version = "1.0.8", features = ["alloc"]}
+radsort = "0.1.0"
 slimmer_box = "0.6.5"
 tempfile = "3"
 thrift = "0.17.0"
diff --git a/native-engine/datafusion-ext-commons/src/bytes_arena.rs b/native-engine/datafusion-ext-commons/src/bytes_arena.rs
index dd6e45e..ac96121 100644
--- a/native-engine/datafusion-ext-commons/src/bytes_arena.rs
+++ b/native-engine/datafusion-ext-commons/src/bytes_arena.rs
@@ -97,7 +97,7 @@
     }
 }
 
-#[derive(Clone, Copy, PartialEq, Eq, Hash)]
+#[derive(Clone, Copy, Default, PartialEq, Eq, Hash)]
 pub struct BytesArenaAddr(u64);
 
 impl BytesArenaAddr {
diff --git a/native-engine/datafusion-ext-commons/src/io/batch_serde.rs b/native-engine/datafusion-ext-commons/src/io/batch_serde.rs
index 83aca5d..f6a6d06 100644
--- a/native-engine/datafusion-ext-commons/src/io/batch_serde.rs
+++ b/native-engine/datafusion-ext-commons/src/io/batch_serde.rs
@@ -13,7 +13,7 @@
 // limitations under the License.
 
 use std::{
-    io::{BufReader, BufWriter, Read, Write},
+    io::{Read, Write},
     mem::size_of,
     sync::Arc,
 };
@@ -22,9 +22,7 @@
     array::*,
     buffer::{Buffer, MutableBuffer},
     datatypes::*,
-    record_batch::{RecordBatch, RecordBatchOptions},
 };
-use bitvec::prelude::BitVec;
 use datafusion::common::Result;
 use unchecked_index::unchecked_index;
 
@@ -33,90 +31,32 @@
     io::{read_bytes_slice, read_len, write_len},
 };
 
-pub fn write_batch<W: Write>(batch: &RecordBatch, output: &mut W) -> Result<()> {
-    let mut output = BufWriter::new(output);
-    let schema = batch.schema();
-
+pub fn write_batch(num_rows: usize, cols: &[ArrayRef], mut output: impl Write) -> Result<()> {
     // write number of columns and rows
-    write_len(batch.num_columns(), &mut output)?;
-    write_len(batch.num_rows(), &mut output)?;
-
-    // write column data types
-    for field in schema.fields() {
-        write_data_type(field.data_type(), &mut output).map_err(|err| {
-            err.context(format!(
-                "batch_serde error writing data type: {}",
-                field.data_type()
-            ))
-        })?;
-    }
-
-    // write column nullables
-    let mut nullables = BitVec::<u8>::with_capacity(batch.num_columns());
-    for field in schema.fields() {
-        nullables.push(field.is_nullable());
-    }
-    output.write_all(&nullables.into_vec())?;
+    write_len(cols.len(), &mut output)?;
+    write_len(num_rows, &mut output)?;
 
     // write columns
-    for column in batch.columns() {
-        write_array(column, &mut output).map_err(|err| {
-            err.context(format!(
-                "batch_serde error writing column (data_type={})",
-                column.data_type()
-            ))
-        })?;
+    for col in cols {
+        write_data_type(col.data_type(), &mut output)?;
+        write_array(col, &mut output)?;
     }
     Ok(())
 }
 
-pub fn read_batch<R: Read>(input: &mut R) -> Result<RecordBatch> {
-    let mut input: Box<dyn Read> = Box::new(BufReader::new(input));
-
+pub fn read_batch(mut input: impl Read) -> Result<(usize, Vec<ArrayRef>)> {
     // read number of columns and rows
-    let num_columns = read_len(&mut input)?;
+    let num_cols = read_len(&mut input)?;
     let num_rows = read_len(&mut input)?;
 
-    // read column data types
-    let mut data_types = Vec::with_capacity(num_columns);
-    for _ in 0..num_columns {
-        data_types.push(
-            read_data_type(&mut input)
-                .map_err(|err| err.context("batch_serde error reading data type"))?,
-        );
-    }
-
-    // read nullables
-    let nullables_bytes = read_bytes_slice(&mut input, (num_columns + 7) / 8)?;
-    let nullables = BitVec::<u8>::from_vec(nullables_bytes.into());
-
-    // create schema
-    let schema = Arc::new(Schema::new(
-        data_types
-            .iter()
-            .enumerate()
-            .map(|(i, data_type)| Field::new("", data_type.clone(), nullables[i]))
-            .collect::<Fields>(),
-    ));
-
     // read columns
-    let columns = (0..num_columns)
-        .map(|i| {
-            read_array(&mut input, &data_types[i], num_rows).map_err(|err| {
-                err.context(format!(
-                    "batch_serde error reading column (data_type={}, num_rows={})",
-                    data_types[i], num_rows,
-                ))
-            })
+    let cols = (0..num_cols)
+        .map(|_| {
+            let dt = read_data_type(&mut input)?;
+            read_array(&mut input, &dt, num_rows)
         })
         .collect::<Result<_>>()?;
-
-    // create batch
-    Ok(RecordBatch::try_new_with_options(
-        schema,
-        columns,
-        &RecordBatchOptions::new().with_row_count(Some(num_rows)),
-    )?)
+    Ok((num_rows, cols))
 }
 
 pub fn write_array<W: Write>(array: &dyn Array, output: &mut W) -> Result<()> {
@@ -682,7 +622,7 @@
         batch_serde::{
             read_batch, read_primitive_raw_array, write_batch, write_primitive_raw_array,
         },
-        name_batch,
+        recover_named_batch,
     };
 
     #[test]
@@ -723,18 +663,24 @@
 
         // test read after write
         let mut buf = vec![];
-        write_batch(&batch, &mut buf).unwrap();
+        write_batch(batch.num_rows(), batch.columns(), &mut buf).unwrap();
         let mut cursor = Cursor::new(buf);
-        let decoded_batch = read_batch(&mut cursor).unwrap();
-        assert_eq!(name_batch(decoded_batch, &batch.schema()).unwrap(), batch);
+        let (decoded_num_rows, decoded_cols) = read_batch(&mut cursor).unwrap();
+        assert_eq!(
+            recover_named_batch(decoded_num_rows, &decoded_cols, batch.schema()).unwrap(),
+            batch
+        );
 
         // test read after write sliced
         let sliced = batch.slice(1, 2);
         let mut buf = vec![];
-        write_batch(&sliced, &mut buf).unwrap();
+        write_batch(sliced.num_rows(), sliced.columns(), &mut buf).unwrap();
         let mut cursor = Cursor::new(buf);
-        let decoded_batch = read_batch(&mut cursor).unwrap();
-        assert_eq!(name_batch(decoded_batch, &sliced.schema()).unwrap(), sliced);
+        let (decoded_num_rows, decoded_cols) = read_batch(&mut cursor).unwrap();
+        assert_eq!(
+            recover_named_batch(decoded_num_rows, &decoded_cols, batch.schema()).unwrap(),
+            sliced
+        );
     }
 
     #[test]
@@ -769,9 +715,9 @@
 
         // test read after write
         let mut buf = vec![];
-        write_batch(&batch, &mut buf).unwrap();
+        write_batch(batch.num_rows(), batch.columns(), &mut buf).unwrap();
         let mut cursor = Cursor::new(buf);
-        let decoded_batch = read_batch(&mut cursor).unwrap();
+        let (decoded_num_rows, decoded_cols) = read_batch(&mut cursor).unwrap();
         assert_batches_eq!(
             vec![
                 "+-----------+-----------+",
@@ -783,15 +729,15 @@
                 "| [6, 7]    | [6, 7]    |",
                 "+-----------+-----------+",
             ],
-            &[name_batch(decoded_batch, &batch.schema()).unwrap()]
+            &[recover_named_batch(decoded_num_rows, &decoded_cols, batch.schema()).unwrap()]
         );
 
         // test read after write sliced
         let sliced = batch.slice(1, 2);
         let mut buf = vec![];
-        write_batch(&sliced, &mut buf).unwrap();
+        write_batch(sliced.num_rows(), sliced.columns(), &mut buf).unwrap();
         let mut cursor = Cursor::new(buf);
-        let decoded_batch = read_batch(&mut cursor).unwrap();
+        let (decoded_num_rows, decoded_cols) = read_batch(&mut cursor).unwrap();
         assert_batches_eq!(
             vec![
                 "+----------+----------+",
@@ -801,7 +747,7 @@
                 "| [3, , 5] | [3, , 5] |",
                 "+----------+----------+",
             ],
-            &[name_batch(decoded_batch, &batch.schema()).unwrap()]
+            &[recover_named_batch(decoded_num_rows, &decoded_cols, sliced.schema()).unwrap()]
         );
     }
 
@@ -833,18 +779,24 @@
 
         // test read after write
         let mut buf = vec![];
-        write_batch(&batch, &mut buf).unwrap();
+        write_batch(batch.num_rows(), batch.columns(), &mut buf).unwrap();
         let mut cursor = Cursor::new(buf);
-        let decoded_batch = read_batch(&mut cursor).unwrap();
-        assert_eq!(name_batch(decoded_batch, &batch.schema()).unwrap(), batch);
+        let (decoded_num_rows, decoded_cols) = read_batch(&mut cursor).unwrap();
+        assert_eq!(
+            recover_named_batch(decoded_num_rows, &decoded_cols, batch.schema()).unwrap(),
+            batch
+        );
 
         // test read after write sliced
         let sliced = batch.slice(1, 2);
         let mut buf = vec![];
-        write_batch(&sliced, &mut buf).unwrap();
+        write_batch(sliced.num_rows(), sliced.columns(), &mut buf).unwrap();
         let mut cursor = Cursor::new(buf);
-        let decoded_batch = read_batch(&mut cursor).unwrap();
-        assert_eq!(name_batch(decoded_batch, &sliced.schema()).unwrap(), sliced);
+        let (decoded_num_rows, decoded_cols) = read_batch(&mut cursor).unwrap();
+        assert_eq!(
+            recover_named_batch(decoded_num_rows, &decoded_cols, sliced.schema()).unwrap(),
+            sliced
+        );
     }
 
     #[test]
@@ -865,17 +817,23 @@
 
         // test read after write
         let mut buf = vec![];
-        write_batch(&batch, &mut buf).unwrap();
+        write_batch(batch.num_rows(), batch.columns(), &mut buf).unwrap();
         let mut cursor = Cursor::new(buf);
-        let decoded_batch = read_batch(&mut cursor).unwrap();
-        assert_eq!(name_batch(decoded_batch, &batch.schema()).unwrap(), batch);
+        let (decoded_num_rows, decoded_cols) = read_batch(&mut cursor).unwrap();
+        assert_eq!(
+            recover_named_batch(decoded_num_rows, &decoded_cols, batch.schema()).unwrap(),
+            batch
+        );
 
         // test read after write sliced
         let sliced = batch.slice(1, 2);
         let mut buf = vec![];
-        write_batch(&sliced, &mut buf).unwrap();
+        write_batch(sliced.num_rows(), sliced.columns(), &mut buf).unwrap();
         let mut cursor = Cursor::new(buf);
-        let decoded_batch = read_batch(&mut cursor).unwrap();
-        assert_eq!(name_batch(decoded_batch, &sliced.schema()).unwrap(), sliced);
+        let (decoded_num_rows, decoded_cols) = read_batch(&mut cursor).unwrap();
+        assert_eq!(
+            recover_named_batch(decoded_num_rows, &decoded_cols, batch.schema()).unwrap(),
+            sliced
+        );
     }
 }
diff --git a/native-engine/datafusion-ext-commons/src/io/mod.rs b/native-engine/datafusion-ext-commons/src/io/mod.rs
index d1ad0e4..80ae786 100644
--- a/native-engine/datafusion-ext-commons/src/io/mod.rs
+++ b/native-engine/datafusion-ext-commons/src/io/mod.rs
@@ -12,67 +12,67 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use std::io::{Read, Seek, SeekFrom, Write};
+use std::io::{Read, Write};
 
 use arrow::{
-    array::StructArray,
-    datatypes::{DataType, SchemaRef},
+    array::{Array, ArrayRef, RecordBatchOptions},
+    datatypes::SchemaRef,
     record_batch::RecordBatch,
 };
 pub use batch_serde::{read_array, read_data_type, write_array, write_data_type};
-use datafusion::common::{cast::as_struct_array, Result};
+use datafusion::common::Result;
 pub use scalar_serde::{read_scalar, write_scalar};
 
+use crate::cast::cast;
+
 mod batch_serde;
 mod scalar_serde;
 
-pub fn write_one_batch<W: Write + Seek>(batch: &RecordBatch, output: &mut W) -> Result<()> {
-    if batch.num_rows() == 0 {
-        return Ok(());
-    }
-    // write ipc_length placeholder
-    let start_pos = output.stream_position()?;
-    output.write_all(&[0u8; 8])?;
+pub fn write_one_batch(num_rows: usize, cols: &[ArrayRef], mut output: impl Write) -> Result<()> {
+    assert!(cols.iter().all(|col| col.len() == num_rows));
 
-    // write
-    batch_serde::write_batch(batch, output)?;
-    let end_pos = output.stream_position()?;
-    let ipc_length = end_pos - start_pos - 8;
-
-    // fill ipc length
-    output.seek(SeekFrom::Start(start_pos))?;
-    output.write_all(&ipc_length.to_le_bytes()[..])?;
-    output.seek(SeekFrom::Start(end_pos))?;
+    let mut batch_data = vec![];
+    batch_serde::write_batch(num_rows, cols, &mut batch_data)?;
+    write_len(batch_data.len(), &mut output)?;
+    output.write_all(&batch_data)?;
     Ok(())
 }
 
-pub fn read_one_batch<R: Read>(input: &mut R, schema: &SchemaRef) -> Result<Option<RecordBatch>> {
-    // read ipc length
-    let mut ipc_length_buf = [0u8; 8];
-    if let Err(e) = input.read_exact(&mut ipc_length_buf) {
-        if e.kind() == std::io::ErrorKind::UnexpectedEof {
-            return Ok(None);
+pub fn read_one_batch(mut input: impl Read) -> Result<Option<(usize, Vec<ArrayRef>)>> {
+    let batch_data_len = match read_len(&mut input) {
+        Ok(len) => len,
+        Err(e) => {
+            if e.kind() == std::io::ErrorKind::UnexpectedEof {
+                return Ok(None);
+            }
+            return Err(e.into());
         }
-        return Err(e.into());
-    }
-    let ipc_length = u64::from_le_bytes(ipc_length_buf);
-    let mut input = Box::new(input.take(ipc_length));
-
-    // read
-    let nameless_batch = batch_serde::read_batch(&mut input)?;
+    };
+    let mut input = input.take(batch_data_len as u64);
+    let (num_rows, cols) = batch_serde::read_batch(&mut input)?;
 
     // consume trailing bytes
     std::io::copy(&mut input, &mut std::io::sink())?;
 
-    // recover schema name
-    return Ok(Some(name_batch(nameless_batch, schema)?));
+    assert!(cols.iter().all(|col| col.len() == num_rows));
+    return Ok(Some((num_rows, cols)));
 }
 
-pub fn name_batch(batch: RecordBatch, name_schema: &SchemaRef) -> Result<RecordBatch> {
-    Ok(RecordBatch::from(as_struct_array(&crate::cast::cast(
-        &StructArray::from(batch),
-        &DataType::Struct(name_schema.fields.clone()),
-    )?)?))
+pub fn recover_named_batch(
+    num_rows: usize,
+    cols: &[ArrayRef],
+    schema: SchemaRef,
+) -> Result<RecordBatch> {
+    let cols = cols
+        .iter()
+        .zip(schema.fields())
+        .map(|(col, field)| Ok(cast(&col, field.data_type())?))
+        .collect::<Result<Vec<_>>>()?;
+    Ok(RecordBatch::try_new_with_options(
+        schema,
+        cols,
+        &RecordBatchOptions::new().with_row_count(Some(num_rows)),
+    )?)
 }
 
 pub fn write_len<W: Write>(mut len: usize, output: &mut W) -> std::io::Result<()> {
diff --git a/native-engine/datafusion-ext-commons/src/rdxsort.rs b/native-engine/datafusion-ext-commons/src/rdxsort.rs
index 95fff51..f32c610 100644
--- a/native-engine/datafusion-ext-commons/src/rdxsort.rs
+++ b/native-engine/datafusion-ext-commons/src/rdxsort.rs
@@ -12,110 +12,46 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#[inline]
-pub fn radix_sort_u16_by<T>(array: &mut [T], key: impl Fn(&T) -> u16) -> Vec<usize> {
-    radix_sort_u16_ranged_by(array, 65536, key)
+use std::vec::IntoIter;
+
+use radsort::Key;
+
+const STD_SORT_LIMIT: usize = 4096;
+
+pub fn radix_sort_unstable(array: &mut [impl Key + Ord]) {
+    radix_sort_unstable_by_key(array, |v| *v);
 }
 
-#[inline]
-pub fn radix_sort_u16_ranged_by<T>(
-    array: &mut [T],
-    num_keys: usize,
-    key: impl Fn(&T) -> u16,
-) -> Vec<usize> {
-    // performance critical
-    unsafe {
-        // count
-        let mut counts = vec![0; num_keys];
-        for item in array.iter() {
-            *counts.get_unchecked_mut(key(item) as usize) += 1;
-        }
-
-        // construct parts
-        #[derive(Default, Clone, Copy)]
-        struct Part {
-            cur: usize,
-            end: usize,
-        }
-        let mut parts = vec![Part::default(); num_keys];
-        let mut beg = 0;
-        for (idx, count) in counts.iter().enumerate() {
-            if *count > 0 {
-                *parts.get_unchecked_mut(idx) = Part {
-                    cur: beg,
-                    end: beg + count,
-                };
-                beg += count;
-            }
-        }
-
-        // reorganize each partition
-        let mut inexhausted_part_indices = vec![0; num_keys];
-        for i in 0..num_keys {
-            inexhausted_part_indices[i] = i;
-        }
-        while {
-            inexhausted_part_indices.retain(|&i| {
-                let part = parts.get_unchecked(i);
-                part.cur < part.end
-            });
-            inexhausted_part_indices.len() > 1
-        } {
-            for &part_idx in &inexhausted_part_indices {
-                let cur_part = parts.get_unchecked(part_idx);
-                let cur = cur_part.cur;
-                let end = cur_part.end;
-                for item_idx in cur..end {
-                    let target_part_idx = key(array.get_unchecked(item_idx)) as usize;
-                    let target_part = parts.get_unchecked_mut(target_part_idx);
-                    array.swap_unchecked(item_idx, target_part.cur);
-                    target_part.cur += 1;
-                }
-            }
-        }
-
-        // returns counts of each bucket
-        counts
+pub fn radix_sort_unstable_by_key<T, K: Key + Ord>(array: &mut [T], key: impl Fn(&T) -> K) {
+    if array.len() < STD_SORT_LIMIT {
+        array.sort_unstable_by_key(key);
+    } else {
+        radsort::sort_by_key(array, key);
     }
 }
 
-#[cfg(test)]
-mod test {
-    use rand::Rng;
-
-    use crate::rdxsort::radix_sort_u16_by;
-
-    #[test]
-    fn fuzzytest_u16_small() {
-        for n in 0..1000 {
-            let mut array = vec![];
-            for _ in 0..n {
-                array.push(rand::thread_rng().gen::<u16>());
-            }
-
-            let mut array1 = array.clone();
-            radix_sort_u16_by(&mut array1, |key| *key);
-
-            let mut array2 = array.clone();
-            array2.sort_unstable();
-
-            assert_eq!(array1, array2);
-        }
+pub trait RadixSortIterExt: Iterator {
+    fn radix_sorted_unstable(self) -> IntoIter<Self::Item>
+    where
+        Self: Sized,
+        Self::Item: Key + Ord,
+    {
+        let mut vec: Vec<Self::Item> = self.collect();
+        radix_sort_unstable(&mut vec);
+        vec.into_iter()
     }
 
-    #[test]
-    fn fuzzytest_u16_1m() {
-        let mut array = vec![];
-        for _ in 0..1000000 {
-            array.push(rand::thread_rng().gen::<u16>());
-        }
-
-        let mut array1 = array.clone();
-        radix_sort_u16_by(&mut array1, |key| *key);
-
-        let mut array2 = array.clone();
-        array2.sort_unstable();
-
-        assert_eq!(array1, array2);
+    fn radix_sorted_unstable_by_key<K: Key + Ord>(
+        self,
+        key: impl Fn(&Self::Item) -> K,
+    ) -> IntoIter<Self::Item>
+    where
+        Self: Sized,
+    {
+        let mut vec: Vec<Self::Item> = self.collect();
+        radix_sort_unstable_by_key(&mut vec, key);
+        vec.into_iter()
     }
 }
+
+impl<T, I: Iterator<Item = T>> RadixSortIterExt for I {}
diff --git a/native-engine/datafusion-ext-commons/src/streams/coalesce_stream.rs b/native-engine/datafusion-ext-commons/src/streams/coalesce_stream.rs
index acfb267..4b9dadd 100644
--- a/native-engine/datafusion-ext-commons/src/streams/coalesce_stream.rs
+++ b/native-engine/datafusion-ext-commons/src/streams/coalesce_stream.rs
@@ -19,9 +19,14 @@
 };
 
 use arrow::{
-    datatypes::SchemaRef,
+    array::{make_array, new_empty_array, Array, ArrayRef, AsArray, Capacities, MutableArrayData},
+    datatypes::{
+        ArrowNativeType, BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, SchemaRef,
+        Utf8Type,
+    },
     record_batch::{RecordBatch, RecordBatchOptions},
 };
+use arrow_schema::DataType;
 use datafusion::{
     common::Result,
     execution::TaskContext,
@@ -107,9 +112,9 @@
 
         // coalesce each column
         let mut coalesced_cols = vec![];
-        for cols in all_cols {
-            let ref_cols = cols.iter().map(|col| col.as_ref()).collect::<Vec<_>>();
-            coalesced_cols.push(arrow::compute::concat(&ref_cols)?);
+        for (cols, field) in all_cols.into_iter().zip(schema.fields()) {
+            let dt = field.data_type();
+            coalesced_cols.push(coalesce_arrays_unchecked(dt, &cols));
         }
         let coalesced_batch = RecordBatch::try_new_with_options(
             schema,
@@ -171,3 +176,46 @@
         }
     }
 }
+
+/// coalesce arrays without checking there data types, invokers must make
+/// sure all arrays have the same data type
+pub fn coalesce_arrays_unchecked(data_type: &DataType, arrays: &[ArrayRef]) -> ArrayRef {
+    if arrays.is_empty() {
+        return new_empty_array(data_type);
+    }
+    if arrays.len() == 1 {
+        return arrays[0].clone();
+    }
+
+    fn binary_capacity<T: ByteArrayType>(arrays: &[ArrayRef]) -> Capacities {
+        let mut item_capacity = 0;
+        let mut bytes_capacity = 0;
+        for array in arrays {
+            let a = array.as_bytes::<T>();
+
+            // Guaranteed to always have at least one element
+            let offsets = a.value_offsets();
+            bytes_capacity += offsets[offsets.len() - 1].as_usize() - offsets[0].as_usize();
+            item_capacity += a.len();
+        }
+        Capacities::Binary(item_capacity, Some(bytes_capacity))
+    }
+
+    let capacity = match data_type {
+        DataType::Utf8 => binary_capacity::<Utf8Type>(arrays),
+        DataType::LargeUtf8 => binary_capacity::<LargeUtf8Type>(arrays),
+        DataType::Binary => binary_capacity::<BinaryType>(arrays),
+        DataType::LargeBinary => binary_capacity::<LargeBinaryType>(arrays),
+        _ => Capacities::Array(arrays.iter().map(|a| a.len()).sum()),
+    };
+
+    // Concatenates arrays using MutableArrayData
+    let array_data: Vec<_> = arrays.iter().map(|a| a.to_data()).collect::<Vec<_>>();
+    let array_data_refs = array_data.iter().collect();
+    let mut mutable = MutableArrayData::with_capacities(array_data_refs, false, capacity);
+
+    for (i, a) in arrays.iter().enumerate() {
+        mutable.extend(i, 0, a.len())
+    }
+    make_array(mutable.freeze())
+}
diff --git a/native-engine/datafusion-ext-exprs/src/named_struct.rs b/native-engine/datafusion-ext-exprs/src/named_struct.rs
index 891dc37..0093869 100644
--- a/native-engine/datafusion-ext-exprs/src/named_struct.rs
+++ b/native-engine/datafusion-ext-exprs/src/named_struct.rs
@@ -19,11 +19,7 @@
     sync::Arc,
 };
 
-use arrow::{
-    array::Array,
-    datatypes::{Field, Fields, SchemaRef},
-    record_batch::RecordBatchOptions,
-};
+use arrow::datatypes::SchemaRef;
 use datafusion::{
     arrow::{
         array::StructArray,
@@ -34,7 +30,7 @@
     logical_expr::ColumnarValue,
     physical_expr::{physical_exprs_bag_equal, PhysicalExpr},
 };
-use datafusion_ext_commons::{df_execution_err, io::name_batch};
+use datafusion_ext_commons::{df_execution_err, io::recover_named_batch};
 
 use crate::down_cast_any_ref;
 
@@ -102,17 +98,9 @@
                     .and_then(|r| r.into_array(batch.num_rows()))
             })
             .collect::<Result<Vec<_>>>()?;
-        let input_empty_fields = input_arrays
-            .iter()
-            .map(|array| Field::new("", array.data_type().clone(), array.null_count() > 0))
-            .collect::<Fields>();
-        let input_batch = RecordBatch::try_new_with_options(
-            Arc::new(Schema::new(input_empty_fields)),
-            input_arrays,
-            &RecordBatchOptions::new().with_row_count(Some(batch.num_rows())),
-        )?;
 
-        let named_batch = name_batch(input_batch, &self.return_schema)?;
+        let named_batch =
+            recover_named_batch(batch.num_rows(), &input_arrays, self.return_schema.clone())?;
         let named_struct = Arc::new(StructArray::from(named_batch));
         Ok(ColumnarValue::Array(named_struct))
     }
diff --git a/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs b/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs
index 48a2e52..688101d 100644
--- a/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs
+++ b/native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs
@@ -20,7 +20,7 @@
 };
 
 use arrow::{
-    array::{as_struct_array, make_array, ArrayRef},
+    array::{as_struct_array, make_array, Array, ArrayRef},
     datatypes::{DataType, Field, Schema, SchemaRef},
     ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema},
     record_batch::{RecordBatch, RecordBatchOptions},
@@ -33,7 +33,10 @@
     error::Result, logical_expr::ColumnarValue, physical_expr::physical_exprs_bag_equal,
     physical_plan::PhysicalExpr,
 };
-use datafusion_ext_commons::{cast::cast, df_execution_err, ffi_helper::batch_to_ffi};
+use datafusion_ext_commons::{
+    cast::cast, df_execution_err, ffi_helper::batch_to_ffi,
+    streams::coalesce_stream::coalesce_arrays_unchecked,
+};
 use jni::objects::GlobalRef;
 use once_cell::sync::OnceCell;
 
@@ -189,12 +192,8 @@
             .into_iter()
             .map(|fut| fut.join().unwrap())
             .collect::<Result<Vec<_>>>()?;
-        let imported_array = arrow::compute::concat(
-            &sub_imported_arrays
-                .iter()
-                .map(|array| array.as_ref())
-                .collect::<Vec<_>>(),
-        )?;
+        let imported_array =
+            coalesce_arrays_unchecked(sub_imported_arrays[0].data_type(), &sub_imported_arrays);
         Ok(ColumnarValue::Array(imported_array))
     }
 
diff --git a/native-engine/datafusion-ext-plans/Cargo.toml b/native-engine/datafusion-ext-plans/Cargo.toml
index 0706c36..9043081 100644
--- a/native-engine/datafusion-ext-plans/Cargo.toml
+++ b/native-engine/datafusion-ext-plans/Cargo.toml
@@ -39,5 +39,6 @@
 smallvec = "1.13.2"
 tempfile = "3"
 tokio = "1.39"
+unchecked-index = "0.2.2"
 uuid = "1.10.0"
 zstd = "0.13.2"
diff --git a/native-engine/datafusion-ext-plans/src/agg/acc.rs b/native-engine/datafusion-ext-plans/src/agg/acc.rs
index 1964507..3deb9f1 100644
--- a/native-engine/datafusion-ext-plans/src/agg/acc.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/acc.rs
@@ -14,6 +14,7 @@
 
 use std::{
     any::Any,
+    hash::Hasher,
     io::{Cursor, Read, Write},
     mem::{size_of, size_of_val},
 };
@@ -29,13 +30,12 @@
     slim_bytes::SlimBytes,
     spark_bloom_filter::SparkBloomFilter,
 };
+use gxhash::GxHasher;
 use hashbrown::raw::RawTable;
 use itertools::Itertools;
 use slimmer_box::SlimmerBox;
 use smallvec::SmallVec;
 
-use crate::agg::agg_table::gx_hash;
-
 pub type DynVal = Option<Box<dyn AggDynValue>>;
 
 const ACC_STORE_BLOCK_SIZE: usize = 65536;
@@ -69,6 +69,10 @@
         self.dyn_store = vec![];
     }
 
+    pub fn len(&self) -> usize {
+        self.num_accs
+    }
+
     pub fn mem_size(&self) -> usize {
         self.num_accs * (self.fixed_len() + self.dyns_len() * size_of::<DynVal>() + 32)
     }
@@ -411,6 +415,14 @@
 pub type LoadFn = Box<dyn Fn(&mut LoadReader) -> Result<DynVal> + Send + Sync>;
 pub type SaveFn = Box<dyn Fn(&mut SaveWriter, DynVal) -> Result<()> + Send + Sync>;
 
+#[inline]
+pub fn acc_hash(value: impl AsRef<[u8]>) -> u32 {
+    const AGG_DYN_SET_HASH_SEED: i64 = 0x7BCB48DA4C72B4F2;
+    let mut h = GxHasher::with_seed(AGG_DYN_SET_HASH_SEED);
+    h.write(value.as_ref());
+    h.finish() as u32 | 0x80000000
+}
+
 pub fn create_dyn_loaders_from_initial_value(values: &[AccumInitialValue]) -> Result<Vec<LoadFn>> {
     let mut loaders: Vec<LoadFn> = vec![];
     for value in values {
@@ -501,9 +513,9 @@
                                 InternalSet::Small(s) => s.push(pos_len),
                                 InternalSet::Huge(s) => {
                                     let raw = list.ref_raw(pos_len);
-                                    let hash = gx_hash::<AGG_DYN_SET_HASH_SEED>(raw);
+                                    let hash = acc_hash(raw) as u64;
                                     s.insert(hash, pos_len, |&pos_len| {
-                                        gx_hash::<AGG_DYN_SET_HASH_SEED>(list.ref_raw(pos_len))
+                                        acc_hash(list.ref_raw(pos_len)) as u64
                                     });
                                 }
                             }
@@ -901,9 +913,9 @@
 
             for &mut pos_len in s {
                 let raw = list.ref_raw(pos_len);
-                let hash = gx_hash::<AGG_DYN_SET_HASH_SEED>(raw);
+                let hash = acc_hash(raw) as u64;
                 huge.insert(hash, pos_len, |&pos_len| {
-                    gx_hash::<AGG_DYN_SET_HASH_SEED>(list.ref_raw(pos_len))
+                    acc_hash(list.ref_raw(pos_len)) as u64
                 });
             }
             *self = Self::Huge(huge);
@@ -911,8 +923,6 @@
     }
 }
 
-const AGG_DYN_SET_HASH_SEED: i64 = 0x7BCB48DA4C72B4F2;
-
 impl AggDynSet {
     pub fn append(&mut self, value: &ScalarValue, nullable: bool) {
         let old_raw_len = self.list.raw.len();
@@ -954,11 +964,11 @@
                 }
             }
             InternalSet::Huge(s) => {
-                let hash = gx_hash::<AGG_DYN_SET_HASH_SEED>(raw);
+                let hash = acc_hash(raw) as u64;
                 match s.find_or_find_insert_slot(
                     hash,
                     |&pos_len| new_len == pos_len.1 as usize && raw == self.list.ref_raw(pos_len),
-                    |&pos_len| gx_hash::<AGG_DYN_SET_HASH_SEED>(self.list.ref_raw(pos_len)),
+                    |&pos_len| acc_hash(self.list.ref_raw(pos_len)) as u64,
                 ) {
                     Ok(_found) => {}
                     Err(slot) => {
@@ -993,13 +1003,13 @@
             }
             InternalSet::Huge(s) => {
                 let new_value = self.list.ref_raw(new_pos_len);
-                let hash = gx_hash::<AGG_DYN_SET_HASH_SEED>(new_value);
+                let hash = acc_hash(new_value) as u64;
                 match s.find_or_find_insert_slot(
                     hash,
                     |&pos_len| {
                         new_len == pos_len.1 as usize && new_value == self.list.ref_raw(pos_len)
                     },
-                    |&pos_len| gx_hash::<AGG_DYN_SET_HASH_SEED>(self.list.ref_raw(pos_len)),
+                    |&pos_len| acc_hash(self.list.ref_raw(pos_len)) as u64,
                 ) {
                     Ok(_found) => {
                         inserted = false;
diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs b/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs
new file mode 100644
index 0000000..67a2d55
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs
@@ -0,0 +1,104 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{iter::Filter, vec::IntoIter};
+
+use datafusion_ext_commons::bytes_arena::{BytesArena, BytesArenaAddr};
+
+use crate::unchecked;
+
+#[derive(Default, Clone, Copy)]
+pub struct AggHashMapItem {
+    pub key_addr: BytesArenaAddr,
+    pub non_zero_hash: u32,
+    pub acc_idx: u32,
+}
+
+pub enum AggHashMapLookupEntry<'a> {
+    Occupied(&'a mut AggHashMapItem),
+    Vaccant(&'a mut AggHashMapItem),
+}
+
+#[derive(Default)]
+pub struct AggHashMap {
+    items: Vec<AggHashMapItem>,
+}
+
+impl AggHashMap {
+    pub fn lookup<'a>(
+        &'a mut self,
+        bytes: &BytesArena,
+        non_zero_hash: u32,
+        key: impl AsRef<[u8]>,
+    ) -> AggHashMapLookupEntry<'a> {
+        let mut bucket = (non_zero_hash as usize) % self.items.len();
+        let mut items = unchecked!(&mut self.items);
+
+        loop {
+            let item_hash = items[bucket].non_zero_hash;
+            if item_hash == 0 {
+                return AggHashMapLookupEntry::Vaccant(unsafe {
+                    // safety: items[bucket] has lifetime 'a
+                    std::mem::transmute(&mut items[bucket])
+                });
+            }
+            if item_hash == non_zero_hash && bytes.get(items[bucket].key_addr) == key.as_ref() {
+                return AggHashMapLookupEntry::Occupied(unsafe {
+                    // safety: items[bucket] has lifetime 'a
+                    std::mem::transmute(&mut items[bucket])
+                });
+            }
+            bucket += 1;
+            bucket %= items.len();
+        }
+    }
+
+    pub fn mem_size(&self) -> usize {
+        self.items.capacity() * size_of::<AggHashMapItem>()
+    }
+
+    pub fn ensure_capacity(&mut self, len: usize) {
+        if self.items.len() > len * 2 + 16 {
+            return;
+        }
+        let new_capacity = len * 4 + 16;
+        let old_self = std::mem::replace(
+            self,
+            Self {
+                items: vec![AggHashMapItem::default(); new_capacity],
+            },
+        );
+        let mut new_items = unchecked!(&mut self.items);
+
+        for item in old_self {
+            let mut bucket = (item.non_zero_hash as usize) % new_items.len();
+            while new_items[bucket].non_zero_hash != 0 {
+                bucket += 1;
+                bucket %= new_items.len();
+            }
+            new_items[bucket] = item;
+        }
+    }
+}
+
+impl IntoIterator for AggHashMap {
+    type Item = AggHashMapItem;
+    type IntoIter = Filter<IntoIter<AggHashMapItem>, fn(&AggHashMapItem) -> bool>;
+
+    fn into_iter(self) -> Self::IntoIter {
+        self.items
+            .into_iter()
+            .filter(|item| item.non_zero_hash != 0)
+    }
+}
diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs
index 194f4bc..c5f19b2 100644
--- a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs
@@ -35,22 +35,22 @@
 };
 use datafusion_ext_commons::{
     array_size::ArraySize,
-    bytes_arena::{BytesArena, BytesArenaAddr},
+    bytes_arena::BytesArena,
     downcast_any,
     ds::rdx_tournament_tree::{KeyForRadixTournamentTree, RadixTournamentTree},
     io::{read_bytes_slice, read_len, write_len},
-    rdxsort::radix_sort_u16_ranged_by,
+    rdxsort::radix_sort_unstable_by_key,
     slim_bytes::SlimBytes,
     staging_mem_size_for_partial_sort, suggested_output_batch_mem_size,
 };
 use futures::lock::Mutex;
 use gxhash::GxHasher;
-use hashbrown::raw::RawTable;
 
 use crate::{
     agg::{
         acc::{AccStore, AccumStateRow, OwnedAccumStateRow, RefAccumStateRow},
         agg_context::AggContext,
+        agg_hash_map::{AggHashMap, AggHashMapLookupEntry},
     },
     common::output::WrappedRecordBatchSender,
     memmgr::{
@@ -120,9 +120,7 @@
             // compute input arrays
             match in_mem.mode {
                 InMemMode::Hashing => {
-                    in_mem
-                        .hashing_data
-                        .update_batch::<GX_HASH_SEED_HASHING>(input_batch)?;
+                    in_mem.hashing_data.update_batch(input_batch)?;
                 }
                 InMemMode::Merging => {
                     in_mem.merging_data.add_batch(input_batch)?;
@@ -241,9 +239,9 @@
                 .hashing_data
                 .map
                 .into_iter()
-                .map(|(key_addr, acc_addr)| {
-                    let key = in_mem.hashing_data.map_key_store.get(key_addr);
-                    let acc = in_mem.hashing_data.acc_store.get(acc_addr);
+                .map(|item| {
+                    let key = in_mem.hashing_data.map_key_store.get(item.key_addr);
+                    let acc = in_mem.hashing_data.acc_store.get(item.acc_idx);
                     (key, acc)
                 })
                 .collect::<Vec<_>>();
@@ -306,13 +304,13 @@
                 let map = cur_hashing.map;
                 let map_key_store = cur_hashing.map_key_store;
                 let acc_store = cur_hashing.acc_store;
-                for (key_addr, value) in map {
+                for item in map {
                     let key = unsafe {
                         // safety:
                         // map_key_store will be append-only while processing the same bucket
-                        std::mem::transmute::<_, &[u8]>(map_key_store.get(key_addr))
+                        std::mem::transmute::<_, &[u8]>(map_key_store.get(item.key_addr))
                     };
-                    let acc = acc_store.get(value);
+                    let acc = acc_store.get(item.acc_idx);
                     staging_records.push((key, acc));
                 }
                 let batch = self.agg_ctx.convert_records_to_batch(staging_records)?;
@@ -348,24 +346,19 @@
 
             // merge records of current bucket
             while min_cursor.cur_bucket_idx == current_bucket_idx {
+                hashing.map.ensure_capacity(hashing.num_records() + 1);
                 let (key, mut acc) = min_cursor.next_record()?;
-                let hash = gx_hash::<GX_HASH_SEED_POST_MERGING>(&key);
-                match hashing.map.find_or_find_insert_slot(
-                    hash,
-                    |v| key.as_ref() == hashing.map_key_store.get(v.0),
-                    |v| gx_hash::<GX_HASH_SEED_POST_MERGING>(hashing.map_key_store.get(v.0)),
-                ) {
-                    Ok(found) => unsafe {
-                        // safety - access hashbrown raw table
-                        let old_acc = &mut hashing.acc_store.get(found.as_mut().1);
+                let hash = merging_data_hash(&key);
+                match hashing.map.lookup(&hashing.map_key_store, hash, &key) {
+                    AggHashMapLookupEntry::Occupied(item) => {
+                        let old_acc = &mut hashing.acc_store.get(item.acc_idx);
                         self.agg_ctx.partial_merge(old_acc, &mut acc.as_mut())?;
-                    },
-                    Err(slot) => unsafe {
-                        // safety - access hashbrown raw table
-                        let key_addr = hashing.map_key_store.add(key.as_ref());
-                        let acc_addr = hashing.acc_store.new_acc_from(&acc);
-                        hashing.map.insert_in_slot(hash, slot, (key_addr, acc_addr));
-                    },
+                    }
+                    AggHashMapLookupEntry::Vaccant(item) => {
+                        item.non_zero_hash = hash;
+                        item.key_addr = hashing.map_key_store.add(&key);
+                        item.acc_idx = hashing.acc_store.new_acc_from(&acc);
+                    }
                 }
             }
         }
@@ -527,26 +520,41 @@
 
 // hasher used in table
 const GX_HASH_SEED_HASHING: i64 = 0x3F6F1B9378DD6AAF;
-const GX_HASH_SEED_MERGING: i64 = 0x7A9A2D4E8C19B4EB;
-const GX_HASH_SEED_POST_MERGING: i64 = 0x1CE19D40EEED6CA2;
+const GX_HASH_SEED_MERGING: i64 = 0x1CE19D40EEED6CA2;
 
 #[inline]
-pub fn gx_hash<const SEED: i64>(value: impl AsRef<[u8]>) -> u64 {
-    let mut h = GxHasher::with_seed(SEED);
+pub fn hash<const SEED: i64>(value: impl AsRef<[u8]>) -> u32 {
+    let mut h = GxHasher::with_seed(GX_HASH_SEED_HASHING);
     h.write(value.as_ref());
-    h.finish()
+    h.finish() as u32 | 0x80000000
 }
 
 #[inline]
-pub fn gx_merging_bucket_id(value: impl AsRef<[u8]>) -> u16 {
-    (gx_hash::<GX_HASH_SEED_MERGING>(value) % NUM_SPILL_BUCKETS as u64) as u16
+pub fn hashing_data_hash(value: impl AsRef<[u8]>) -> u32 {
+    hash::<GX_HASH_SEED_HASHING>(value)
 }
+
+#[inline]
+pub fn merging_data_hash(value: impl AsRef<[u8]>) -> u32 {
+    hash::<GX_HASH_SEED_MERGING>(value)
+}
+
+#[inline]
+pub fn merging_bucket_id(value: impl AsRef<[u8]>) -> u16 {
+    merging_bucket_id_from_hash(hash::<GX_HASH_SEED_HASHING>(value))
+}
+
+#[inline]
+pub fn merging_bucket_id_from_hash(hash: u32) -> u16 {
+    (hash % NUM_SPILL_BUCKETS as u32) as u16
+}
+
 pub struct HashingData {
     agg_ctx: Arc<AggContext>,
     task_ctx: Arc<TaskContext>,
     acc_store: AccStore,
     map_key_store: BytesArena,
-    map: RawTable<(BytesArenaAddr, u32)>, // keys addr to accs store addr
+    map: AggHashMap,
     num_input_records: usize,
     spill_metrics: SpillMetrics,
 }
@@ -576,61 +584,49 @@
     }
 
     fn num_records(&self) -> usize {
-        self.map.len()
+        self.acc_store.len()
     }
 
     fn cardinality_ratio(&self) -> f64 {
         let num_input_records = self.num_input_records;
-        let num_records = self.map.len();
+        let num_records = self.acc_store.len();
         num_records as f64 / num_input_records as f64
     }
 
     fn mem_used(&self) -> usize {
-        // including cost of sorting
-        self.map_key_store.mem_size() + self.acc_store.mem_size() + self.map.capacity() * 32
+        self.map_key_store.mem_size() + self.acc_store.mem_size() + self.map.mem_size()
     }
 
-    fn update_batch<const GX_HASH_SEED: i64>(&mut self, batch: RecordBatch) -> Result<()> {
+    fn update_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        log::warn!("XXX update_batch {}", batch.num_rows());
         let num_rows = batch.num_rows();
         self.num_input_records += num_rows;
+        self.map.ensure_capacity(self.num_records() + num_rows);
 
+        // compute hash
         let grouping_rows = self.agg_ctx.create_grouping_rows(&batch)?;
-        let hashes: Vec<u64> = grouping_rows
+        let hashes: Vec<u32> = grouping_rows
             .iter()
-            .map(|row| gx_hash::<GX_HASH_SEED>(row))
+            .map(|row| hashing_data_hash(row))
             .collect();
 
         // update hashmap
-        let mut acc_addrs = Vec::with_capacity(num_rows);
+        let mut acc_indices = Vec::with_capacity(num_rows);
         for (hash, row) in hashes.into_iter().zip(&grouping_rows) {
-            let found = self
-                .map
-                .find_or_find_insert_slot(
-                    hash,
-                    |v| {
-                        v.0.unpack().len == row.as_ref().len()
-                            && self.map_key_store.get(v.0) == row.as_ref()
-                    },
-                    |v| gx_hash::<GX_HASH_SEED>(self.map_key_store.get(v.0)),
-                )
-                .unwrap_or_else(|slot| {
-                    let key_addr = self.map_key_store.add(row.as_ref());
-                    let acc_addr = self.acc_store.new_acc();
-                    let entry = (key_addr, acc_addr);
-                    unsafe {
-                        // safety: inserting slot is ensured to be valid
-                        self.map.insert_in_slot(hash, slot, entry)
-                    }
-                });
-
-            acc_addrs.push(unsafe {
-                // safety: accessing hashbrown raw table
-                found.as_ref().1
-            });
+            let acc_idx = match self.map.lookup(&self.map_key_store, hash, &row) {
+                AggHashMapLookupEntry::Occupied(item) => item.acc_idx,
+                AggHashMapLookupEntry::Vaccant(item) => {
+                    item.non_zero_hash = hash;
+                    item.key_addr = self.map_key_store.add(row.as_ref());
+                    item.acc_idx = self.acc_store.new_acc();
+                    item.acc_idx
+                }
+            };
+            acc_indices.push(acc_idx);
         }
-        let mut accs = acc_addrs
+        let mut accs = acc_indices
             .into_iter()
-            .map(|acc_addr| self.acc_store.get(acc_addr))
+            .map(|acc_idx| self.acc_store.get(acc_idx))
             .collect::<Vec<_>>();
 
         // partial update
@@ -642,6 +638,7 @@
         let acc_array = self.agg_ctx.get_input_acc_array(&batch)?;
         self.agg_ctx
             .partial_batch_merge_input(&mut accs, acc_array)?;
+        log::warn!("XXX update_batch done");
         Ok(())
     }
 
@@ -650,28 +647,31 @@
         let mut bucketed_records = self
             .map
             .into_iter()
-            .map(|(key_addr, acc_addr)| {
-                let key = self.map_key_store.get(key_addr);
-                let acc = self.acc_store.get(acc_addr);
-                let bucket_id = gx_merging_bucket_id(key);
+            .map(|item| {
+                let key = self.map_key_store.get(item.key_addr);
+                let acc = self.acc_store.get(item.acc_idx);
+                let bucket_id = merging_bucket_id_from_hash(item.non_zero_hash);
                 (key, acc, bucket_id)
             })
             .collect::<Vec<_>>();
-
-        let bucket_counts =
-            radix_sort_u16_ranged_by(&mut bucketed_records, NUM_SPILL_BUCKETS, |v| v.2);
+        radix_sort_unstable_by_key(&mut bucketed_records, |v| v.2);
 
         let mut writer = spill.get_compressed_writer();
         let mut beg = 0;
 
         for i in 0..NUM_SPILL_BUCKETS {
-            if bucket_counts[i] > 0 {
+            let bucket_count = bucketed_records[beg..]
+                .iter()
+                .take_while(|(_, _, bucket_id)| *bucket_id == i as u16)
+                .count();
+
+            if bucket_count > 0 {
                 // write bucket id and number of records in this bucket
                 write_len(i, &mut writer)?;
-                write_len(bucket_counts[i], &mut writer)?;
+                write_len(bucket_count, &mut writer)?;
 
                 // write records in this bucket
-                for (key, acc, _) in &mut bucketed_records[beg..][..bucket_counts[i]] {
+                for (key, acc, _) in &mut bucketed_records[beg..][..bucket_count] {
                     // write key
                     let key = key.as_ref();
                     write_len(key.len(), &mut writer)?;
@@ -680,7 +680,7 @@
                     // write value
                     acc.save(&mut writer, &self.agg_ctx.acc_dyn_savers)?;
                 }
-                beg += bucket_counts[i];
+                beg += bucket_count;
             }
         }
         write_len(NUM_SPILL_BUCKETS, &mut writer)?; // EOF
@@ -757,15 +757,15 @@
             .map(|batch| self.agg_ctx.create_grouping_rows(batch))
             .collect::<Result<Vec<_>>>()?;
 
-        let acc_addrs = staging_batches
+        let acc_indices = staging_batches
             .iter()
             .map(|batch| {
-                let acc_addrs = (0..batch.num_rows())
+                let acc_indices = (0..batch.num_rows())
                     .map(|_| self.staging_acc_store.new_acc())
                     .collect::<Vec<_>>();
-                let mut accs = acc_addrs
+                let mut accs = acc_indices
                     .iter()
-                    .map(|&acc_addr| self.staging_acc_store.get(acc_addr))
+                    .map(|&acc_idx| self.staging_acc_store.get(acc_idx))
                     .collect::<Vec<_>>();
 
                 // partial update
@@ -777,7 +777,7 @@
                 let acc_array = self.agg_ctx.get_input_acc_array(&batch)?;
                 self.agg_ctx
                     .partial_batch_merge_input(&mut accs, acc_array)?;
-                Ok(acc_addrs)
+                Ok(acc_indices)
             })
             .collect::<Result<Vec<_>>>()?;
 
@@ -787,12 +787,12 @@
             .enumerate()
             .flat_map(|(batch_idx, rows)| {
                 rows.iter().enumerate().map(move |(row_idx, row)| {
-                    let bucket_id = gx_merging_bucket_id(&row);
+                    let bucket_id = merging_bucket_id(&row);
                     (batch_idx as u32, row_idx as u32, bucket_id)
                 })
             })
             .collect::<Vec<_>>();
-        radix_sort_u16_ranged_by(&mut sorted, NUM_SPILL_BUCKETS, |v| v.2);
+        radix_sort_unstable_by_key(&mut sorted, |v| v.2);
 
         // store serialized records
         // let acc_store = acc_store.lock();
@@ -805,7 +805,7 @@
             let row_idx = row_idx as usize;
             let grouping_row = grouping_rows[batch_idx].row(row_idx);
             let key = grouping_row.as_ref();
-            let mut acc = self.staging_acc_store.get(acc_addrs[batch_idx][row_idx]);
+            let mut acc = self.staging_acc_store.get(acc_indices[batch_idx][row_idx]);
 
             // serialize this record to temp_raw_record
             write_len(key.len(), &mut temp_raw_record)?;
diff --git a/native-engine/datafusion-ext-plans/src/agg/mod.rs b/native-engine/datafusion-ext-plans/src/agg/mod.rs
index 36cdd22..27f6b0e 100644
--- a/native-engine/datafusion-ext-plans/src/agg/mod.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/mod.rs
@@ -14,6 +14,7 @@
 
 pub mod acc;
 pub mod agg_context;
+pub mod agg_hash_map;
 pub mod agg_table;
 pub mod avg;
 pub mod bloom_filter;
diff --git a/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs b/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs
index 3f1ca6d..16d07e7 100644
--- a/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/broadcast_join_build_hash_map_exec.rs
@@ -18,7 +18,7 @@
     sync::Arc,
 };
 
-use arrow::{compute::concat_batches, datatypes::SchemaRef};
+use arrow::{array::RecordBatch, compute::concat_batches, datatypes::SchemaRef};
 use datafusion::{
     common::Result,
     execution::{SendableRecordBatchStream, TaskContext},
@@ -116,6 +116,16 @@
     }
 }
 
+pub fn collect_hash_map(
+    data_schema: SchemaRef,
+    data_batches: Vec<RecordBatch>,
+    keys: Vec<Arc<dyn PhysicalExpr>>,
+) -> Result<JoinHashMap> {
+    let data_batch = concat_batches(&data_schema, data_batches.iter())?;
+    let hash_map = JoinHashMap::try_from_data_batch(data_batch, &keys)?;
+    Ok(hash_map)
+}
+
 async fn execute_build_hash_map(
     context: Arc<TaskContext>,
     mut input: SendableRecordBatchStream,
@@ -132,11 +142,10 @@
     while let Some(batch) = input.next_batch(Some(&mut timer)).await? {
         data_batches.push(batch);
     }
-    let data_batch = concat_batches(&data_schema, data_batches.iter())?;
 
     // build hash map
     let hash_map_schema = join_hash_map_schema(&data_schema);
-    let hash_map = JoinHashMap::try_from_data_batch(data_batch, &keys)?;
+    let hash_map = collect_hash_map(data_schema, data_batches, keys)?;
     drop(timer);
 
     // output hash map batches as stream
diff --git a/native-engine/datafusion-ext-plans/src/broadcast_join_exec.rs b/native-engine/datafusion-ext-plans/src/broadcast_join_exec.rs
index 3467abb..86f4a14 100644
--- a/native-engine/datafusion-ext-plans/src/broadcast_join_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/broadcast_join_exec.rs
@@ -23,19 +23,20 @@
 
 use arrow::{
     array::RecordBatch,
-    compute::SortOptions,
+    compute::{concat_batches, SortOptions},
     datatypes::{DataType, SchemaRef},
 };
 use async_trait::async_trait;
 use datafusion::{
-    common::{JoinSide, Result, Statistics},
+    common::{DataFusionError, JoinSide, Result, Statistics},
     execution::context::TaskContext,
     physical_expr::{PhysicalExprRef, PhysicalSortExpr},
     physical_plan::{
         joins::utils::JoinOn,
-        metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, Time},
+        metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricValue, MetricsSet, Time},
         stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+        DisplayAs, DisplayFormatType, ExecutionPlan, Metric, Partitioning,
+        SendableRecordBatchStream,
     },
 };
 use datafusion_ext_commons::{
@@ -51,6 +52,7 @@
         batch_statisitcs::{stat_input, InputBatchStatistics},
         column_pruning::ExecuteWithColumnPruning,
         output::{TaskOutputter, WrappedRecordBatchSender},
+        timer_helper::TimerHelper,
     },
     joins::{
         bhj::{
@@ -65,7 +67,7 @@
                 RProbedRightSemiJoiner,
             },
         },
-        join_hash_map::{join_data_schema, JoinHashMap},
+        join_hash_map::{join_data_schema, join_hash_map_schema, JoinHashMap},
         join_utils::{JoinType, JoinType::*},
         JoinParams, JoinProjection,
     },
@@ -79,6 +81,7 @@
     join_type: JoinType,
     broadcast_side: JoinSide,
     schema: SchemaRef,
+    is_built: bool, // true for BroadcastHashJoin, false for ShuffledHashJoin
     cached_build_hash_map_id: Option<String>,
     metrics: ExecutionPlanMetricsSet,
 }
@@ -91,6 +94,7 @@
         on: JoinOn,
         join_type: JoinType,
         broadcast_side: JoinSide,
+        is_built: bool,
         cached_build_hash_map_id: Option<String>,
     ) -> Result<Self> {
         Ok(Self {
@@ -100,6 +104,7 @@
             join_type,
             broadcast_side,
             schema,
+            is_built,
             cached_build_hash_map_id,
             metrics: ExecutionPlanMetricsSet::new(),
         })
@@ -143,12 +148,12 @@
             self.join_type,
             &self.schema,
             &match self.broadcast_side {
-                JoinSide::Left => join_data_schema(&left_schema),
-                JoinSide::Right => left_schema.clone(),
+                JoinSide::Left if self.is_built => join_data_schema(&left_schema),
+                _ => left_schema.clone(),
             },
             &match self.broadcast_side {
-                JoinSide::Left => right_schema.clone(),
-                JoinSide::Right => join_data_schema(&right_schema),
+                JoinSide::Right if self.is_built => join_data_schema(&right_schema),
+                _ => right_schema.clone(),
             },
             projection,
         )?;
@@ -173,37 +178,42 @@
         context: Arc<TaskContext>,
         projection: Vec<usize>,
     ) -> Result<SendableRecordBatchStream> {
-        let metrics = Arc::new(BaselineMetrics::new(&self.metrics, partition));
+        let metrics = self.metrics.clone();
+        let baseline_metrics = Arc::new(BaselineMetrics::new(&metrics, partition));
         let join_params = self.create_join_params(&projection)?;
         let left = self.left.execute(partition, context.clone())?;
         let right = self.right.execute(partition, context.clone())?;
         let broadcast_side = self.broadcast_side;
+        let is_built = self.is_built;
         let cached_build_hash_map_id = self.cached_build_hash_map_id.clone();
 
         // stat probed side
         let input_batch_stat =
             InputBatchStatistics::from_metrics_set_and_blaze_conf(&self.metrics, partition)?;
-        let (left, right) = match broadcast_side {
-            JoinSide::Left => (left, stat_input(input_batch_stat, right)?),
-            JoinSide::Right => (stat_input(input_batch_stat, left)?, right),
-        };
+        let left = stat_input(input_batch_stat.clone(), left)?;
+        let right = stat_input(input_batch_stat.clone(), right)?;
 
-        let metrics_cloned = metrics.clone();
         let context_cloned = context.clone();
         let output_stream = Box::pin(RecordBatchStreamAdapter::new(
             join_params.projection.schema.clone(),
             futures::stream::once(async move {
                 context_cloned.output_with_sender(
-                    "BroadcastJoin",
+                    if is_built {
+                        "BroadcastJoin"
+                    } else {
+                        "HashJoin"
+                    },
                     join_params.projection.schema.clone(),
                     move |sender| {
                         execute_join(
+                            partition,
                             left,
                             right,
                             join_params,
                             broadcast_side,
                             cached_build_hash_map_id,
-                            metrics_cloned,
+                            is_built,
+                            metrics,
                             sender,
                         )
                     },
@@ -211,7 +221,7 @@
             })
             .try_flatten(),
         ));
-        Ok(context.coalesce_with_default_batch_size(output_stream, &metrics)?)
+        Ok(context.coalesce_with_default_batch_size(output_stream, &baseline_metrics)?)
     }
 }
 
@@ -261,6 +271,7 @@
             self.on.iter().cloned().collect(),
             self.join_type,
             self.broadcast_side,
+            self.is_built,
             None,
         )?))
     }
@@ -285,109 +296,70 @@
 
 impl DisplayAs for BroadcastJoinExec {
     fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
-        write!(f, "BroadcastJoin")
+        write!(
+            f,
+            "{}",
+            if self.is_built {
+                "BroadcastJoin"
+            } else {
+                "HashJoin"
+            }
+        )
     }
 }
 
-async fn execute_join(
-    left: SendableRecordBatchStream,
-    right: SendableRecordBatchStream,
+async fn execute_join_with_map(
+    mut probed: SendableRecordBatchStream,
+    map: Arc<JoinHashMap>,
     join_params: JoinParams,
     broadcast_side: JoinSide,
-    cached_build_hash_map_id: Option<String>,
     metrics: Arc<BaselineMetrics>,
+    poll_time: Time,
+    build_time: Time,
     sender: Arc<WrappedRecordBatchSender>,
 ) -> Result<()> {
     let start_time = Instant::now();
     let mut excluded_time_ns = 0;
-    let poll_time = Time::new();
 
-    let (mut probed, _keys, mut joiner): (_, _, Pin<Box<dyn Joiner + Send>>) = match broadcast_side
-    {
-        JoinSide::Left => {
-            let right_schema = right.schema();
-            let mut right_peeked = Box::pin(right.peekable());
-            let (_, lmap_result) = futures::join!(
-                // fetch two sides asynchronously
-                async {
-                    let timer = poll_time.timer();
-                    right_peeked.as_mut().peek().await;
-                    drop(timer);
-                },
-                collect_join_hash_map(
-                    cached_build_hash_map_id,
-                    left,
-                    &join_params.left_keys,
-                    matches!(join_params.join_type, RightSemi | RightAnti),
-                    poll_time.clone(),
-                ),
-            );
-            let lmap = lmap_result?;
-            (
-                Box::pin(RecordBatchStreamAdapter::new(right_schema, right_peeked)),
-                join_params.right_keys.clone(),
-                match join_params.join_type {
-                    Inner => Box::pin(RProbedInnerJoiner::new(join_params, lmap, sender)),
-                    Left => Box::pin(RProbedLeftJoiner::new(join_params, lmap, sender)),
-                    Right => Box::pin(RProbedRightJoiner::new(join_params, lmap, sender)),
-                    Full => Box::pin(RProbedFullOuterJoiner::new(join_params, lmap, sender)),
-                    LeftSemi => Box::pin(RProbedLeftSemiJoiner::new(join_params, lmap, sender)),
-                    LeftAnti => Box::pin(RProbedLeftAntiJoiner::new(join_params, lmap, sender)),
-                    RightSemi => Box::pin(RProbedRightSemiJoiner::new(join_params, lmap, sender)),
-                    RightAnti => Box::pin(RProbedRightAntiJoiner::new(join_params, lmap, sender)),
-                    Existence => Box::pin(RProbedExistenceJoiner::new(join_params, lmap, sender)),
-                },
-            )
-        }
-        JoinSide::Right => {
-            let left_schema = left.schema();
-            let mut left_peeked = Box::pin(left.peekable());
-            let (_, rmap_result) = futures::join!(
-                // fetch two sides asynchronizely
-                async {
-                    let timer = poll_time.timer();
-                    left_peeked.as_mut().peek().await;
-                    drop(timer);
-                },
-                collect_join_hash_map(
-                    cached_build_hash_map_id,
-                    right,
-                    &join_params.right_keys,
-                    matches!(join_params.join_type, LeftSemi | LeftAnti | Existence),
-                    poll_time.clone(),
-                ),
-            );
-            let rmap = rmap_result?;
-            (
-                Box::pin(RecordBatchStreamAdapter::new(left_schema, left_peeked)),
-                join_params.left_keys.clone(),
-                match join_params.join_type {
-                    Inner => Box::pin(LProbedInnerJoiner::new(join_params, rmap, sender)),
-                    Left => Box::pin(LProbedLeftJoiner::new(join_params, rmap, sender)),
-                    Right => Box::pin(LProbedRightJoiner::new(join_params, rmap, sender)),
-                    Full => Box::pin(LProbedFullOuterJoiner::new(join_params, rmap, sender)),
-                    LeftSemi => Box::pin(LProbedLeftSemiJoiner::new(join_params, rmap, sender)),
-                    LeftAnti => Box::pin(LProbedLeftAntiJoiner::new(join_params, rmap, sender)),
-                    RightSemi => Box::pin(LProbedRightSemiJoiner::new(join_params, rmap, sender)),
-                    RightAnti => Box::pin(LProbedRightAntiJoiner::new(join_params, rmap, sender)),
-                    Existence => Box::pin(LProbedExistenceJoiner::new(join_params, rmap, sender)),
-                },
-            )
-        }
+    let mut joiner: Pin<Box<dyn Joiner + Send>> = match broadcast_side {
+        JoinSide::Left => match join_params.join_type {
+            Inner => Box::pin(RProbedInnerJoiner::new(join_params, map, sender)),
+            Left => Box::pin(RProbedLeftJoiner::new(join_params, map, sender)),
+            Right => Box::pin(RProbedRightJoiner::new(join_params, map, sender)),
+            Full => Box::pin(RProbedFullOuterJoiner::new(join_params, map, sender)),
+            LeftSemi => Box::pin(RProbedLeftSemiJoiner::new(join_params, map, sender)),
+            LeftAnti => Box::pin(RProbedLeftAntiJoiner::new(join_params, map, sender)),
+            RightSemi => Box::pin(RProbedRightSemiJoiner::new(join_params, map, sender)),
+            RightAnti => Box::pin(RProbedRightAntiJoiner::new(join_params, map, sender)),
+            Existence => Box::pin(RProbedExistenceJoiner::new(join_params, map, sender)),
+        },
+        JoinSide::Right => match join_params.join_type {
+            Inner => Box::pin(LProbedInnerJoiner::new(join_params, map, sender)),
+            Left => Box::pin(LProbedLeftJoiner::new(join_params, map, sender)),
+            Right => Box::pin(LProbedRightJoiner::new(join_params, map, sender)),
+            Full => Box::pin(LProbedFullOuterJoiner::new(join_params, map, sender)),
+            LeftSemi => Box::pin(LProbedLeftSemiJoiner::new(join_params, map, sender)),
+            LeftAnti => Box::pin(LProbedLeftAntiJoiner::new(join_params, map, sender)),
+            RightSemi => Box::pin(LProbedRightSemiJoiner::new(join_params, map, sender)),
+            RightAnti => Box::pin(LProbedRightAntiJoiner::new(join_params, map, sender)),
+            Existence => Box::pin(LProbedExistenceJoiner::new(join_params, map, sender)),
+        },
     };
 
-    while let Some(batch) = {
-        let timer = poll_time.timer();
-        let batch = probed.next().await.transpose()?;
-        drop(timer);
-        batch
-    } {
+    while let Some(batch) = poll_time
+        .with_timer_async(async { probed.next().await.transpose() })
+        .await?
+    {
         joiner.as_mut().join(batch).await?;
+        if joiner.can_early_stop() {
+            break;
+        }
     }
     joiner.as_mut().finish().await?;
     metrics.record_output(joiner.num_output_rows());
 
     excluded_time_ns += poll_time.value();
+    excluded_time_ns += build_time.value();
     excluded_time_ns += joiner.total_send_output_time();
 
     // discount poll input and send output batch time
@@ -399,6 +371,124 @@
     Ok(())
 }
 
+async fn execute_join(
+    partition: usize,
+    left: SendableRecordBatchStream,
+    right: SendableRecordBatchStream,
+    join_params: JoinParams,
+    broadcast_side: JoinSide,
+    cached_build_hash_map_id: Option<String>,
+    is_built: bool,
+    metrics: ExecutionPlanMetricsSet,
+    sender: Arc<WrappedRecordBatchSender>,
+) -> Result<()> {
+    let baseline_metrics = Arc::new(BaselineMetrics::new(&metrics, partition));
+    let poll_time = Time::new();
+    let build_time = Time::new();
+
+    if !is_built {
+        let build_time_metric = Arc::new(Metric::new(
+            MetricValue::Time {
+                name: "build_hash_map_time".into(),
+                time: build_time.clone(),
+            },
+            Some(partition),
+        ));
+        metrics.register(build_time_metric);
+    }
+
+    let (probed_input, built_input) = match broadcast_side {
+        JoinSide::Left => (right, left),
+        JoinSide::Right => (left, right),
+    };
+    let map_keys = match broadcast_side {
+        JoinSide::Left => join_params.left_keys.clone(),
+        JoinSide::Right => join_params.right_keys.clone(),
+    };
+    let distinct = match broadcast_side {
+        JoinSide::Left => matches!(join_params.join_type, RightSemi | RightAnti),
+        JoinSide::Right => matches!(join_params.join_type, LeftSemi | LeftAnti | Existence),
+    };
+
+    // fetch two sides asynchronously to eagerly fetch probed side
+    let (probed, map) = futures::try_join!(
+        poll_time.with_timer_async(async {
+            let probed_schema = probed_input.schema();
+            let mut probed_peeked = Box::pin(probed_input.peekable());
+            probed_peeked.as_mut().peek().await;
+            Ok(Box::pin(RecordBatchStreamAdapter::new(
+                probed_schema,
+                probed_peeked,
+            )))
+        }),
+        async {
+            if is_built {
+                collect_join_hash_map(
+                    cached_build_hash_map_id,
+                    built_input,
+                    &map_keys,
+                    distinct,
+                    poll_time.clone(),
+                )
+                .await
+            } else {
+                build_join_hash_map(
+                    built_input,
+                    &map_keys,
+                    distinct,
+                    poll_time.clone(),
+                    build_time.clone(),
+                )
+                .await
+            }
+        }
+    )?;
+
+    execute_join_with_map(
+        probed,
+        map,
+        join_params,
+        broadcast_side,
+        baseline_metrics,
+        poll_time,
+        build_time,
+        sender,
+    )
+    .await
+}
+
+async fn build_join_hash_map(
+    input: SendableRecordBatchStream,
+    key_exprs: &[PhysicalExprRef],
+    distinct: bool,
+    poll_time: Time,
+    build_time: Time,
+) -> Result<Arc<JoinHashMap>> {
+    let data_schema = input.schema();
+    let hash_map_schema = join_hash_map_schema(&data_schema);
+
+    let data_batches: Vec<RecordBatch> = poll_time
+        .with_timer_async(async { Ok::<_, DataFusionError>(input.try_collect().await?) })
+        .await?;
+
+    let join_hash_map = build_time.with_timer(|| {
+        let data_batch = concat_batches(&data_schema, data_batches.iter())?;
+        if data_batch.num_rows() == 0 {
+            return Ok(Arc::new(JoinHashMap::try_new_empty(
+                hash_map_schema,
+                key_exprs,
+            )?));
+        }
+
+        let mut join_hash_map = JoinHashMap::try_from_data_batch(data_batch, key_exprs)?;
+        if distinct {
+            join_hash_map.distinct()?;
+        }
+        Ok::<_, DataFusionError>(Arc::new(join_hash_map))
+    })?;
+    Ok(join_hash_map)
+}
+
 async fn collect_join_hash_map(
     cached_build_hash_map_id: Option<String>,
     input: SendableRecordBatchStream,
@@ -422,25 +512,21 @@
 }
 
 async fn collect_join_hash_map_without_caching(
-    mut input: SendableRecordBatchStream,
+    input: SendableRecordBatchStream,
     key_exprs: &[PhysicalExprRef],
     distinct: bool,
     poll_time: Time,
 ) -> Result<JoinHashMap> {
-    let mut hash_map_batches = vec![];
-    while let Some(batch) = {
-        let timer = poll_time.timer();
-        let batch = input.next().await.transpose()?;
-        drop(timer);
-        batch
-    } {
-        hash_map_batches.push(batch);
-    }
+    let hash_map_schema = input.schema();
+    let hash_map_batches: Vec<RecordBatch> = poll_time
+        .with_timer_async(async { Ok::<_, DataFusionError>(input.try_collect().await?) })
+        .await?;
+
     let mut join_hash_map = match hash_map_batches.len() {
-        0 => JoinHashMap::try_new_empty(input.schema(), key_exprs)?,
+        0 => JoinHashMap::try_new_empty(hash_map_schema, key_exprs)?,
         1 => {
             if hash_map_batches[0].num_rows() == 0 {
-                JoinHashMap::try_new_empty(input.schema(), key_exprs)?
+                JoinHashMap::try_new_empty(hash_map_schema, key_exprs)?
             } else {
                 JoinHashMap::try_from_hash_map_batch(hash_map_batches[0].clone(), key_exprs)?
             }
@@ -458,6 +544,10 @@
     async fn join(self: Pin<&mut Self>, probed_batch: RecordBatch) -> Result<()>;
     async fn finish(self: Pin<&mut Self>) -> Result<()>;
 
+    fn can_early_stop(&self) -> bool {
+        false
+    }
+
     fn total_send_output_time(&self) -> usize;
     fn num_output_rows(&self) -> usize;
 }
diff --git a/native-engine/datafusion-ext-plans/src/common/ipc_compression.rs b/native-engine/datafusion-ext-plans/src/common/ipc_compression.rs
index c0cbdea..af60e19 100644
--- a/native-engine/datafusion-ext-plans/src/common/ipc_compression.rs
+++ b/native-engine/datafusion-ext-plans/src/common/ipc_compression.rs
@@ -15,9 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::io::{BufReader, Cursor, Read, Take, Write};
+use std::io::{BufReader, Read, Take, Write};
 
-use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use arrow::array::ArrayRef;
 use blaze_jni_bridge::{conf, conf::StringConf};
 use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
 use datafusion::common::Result;
@@ -48,13 +48,9 @@
     }
 
     /// Write a batch, returning uncompressed bytes size
-    pub fn write_batch(&mut self, batch: RecordBatch) -> Result<()> {
-        let mut batch_buf = vec![];
-        write_one_batch(&batch, &mut Cursor::new(&mut batch_buf))?;
-        self.buf.write_all(&mut batch_buf)?;
+    pub fn write_batch(&mut self, num_rows: usize, cols: &[ArrayRef]) -> Result<()> {
+        write_one_batch(num_rows, cols, &mut self.buf)?;
         self.buf_empty = false;
-        drop(batch_buf);
-
         if self.buf.buf_len() as f64 >= DEFAULT_SHUFFLE_COMPRESSION_TARGET_BUF_SIZE as f64 * 0.9 {
             self.flush()?;
         }
@@ -80,7 +76,6 @@
 }
 
 pub struct IpcCompressionReader<R: Read + 'static> {
-    schema: SchemaRef,
     input: InputState<R>,
 }
 unsafe impl<R: Read> Send for IpcCompressionReader<R> {}
@@ -94,14 +89,13 @@
 }
 
 impl<R: Read> IpcCompressionReader<R> {
-    pub fn new(input: R, schema: SchemaRef) -> Self {
+    pub fn new(input: R) -> Self {
         Self {
-            schema,
             input: InputState::BlockStart(input),
         }
     }
 
-    pub fn read_batch(&mut self) -> Result<Option<RecordBatch>> {
+    pub fn read_batch(&mut self) -> Result<Option<(usize, Vec<ArrayRef>)>> {
         struct Reader<'a, R: Read + 'static>(&'a mut IpcCompressionReader<R>);
         impl<'a, R: Read> Read for Reader<'a, R> {
             fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
@@ -130,8 +124,7 @@
                 }
             }
         }
-        let schema = self.schema.clone();
-        read_one_batch(&mut Reader(self), &schema)
+        read_one_batch(&mut Reader(self))
     }
 }
 
diff --git a/native-engine/datafusion-ext-plans/src/common/make_eq_comparator.rs b/native-engine/datafusion-ext-plans/src/common/make_eq_comparator.rs
new file mode 100644
index 0000000..d9eb526
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/common/make_eq_comparator.rs
@@ -0,0 +1,664 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! make_eq_comparator is derived from arrow-ord/50.0.0/src/arrow_ord/ord.rs
+
+use arrow::{
+    array::{cast::AsArray, types::*, *},
+    datatypes::ArrowNativeType,
+    error::ArrowError,
+};
+
+/// Compare the values at two arbitrary indices in two arrays.
+pub type DynEqComparator = Box<dyn Fn(usize, usize) -> bool + Send + Sync>;
+
+fn eq_impl<A, F>(l: &A, r: &A, eq: F) -> DynEqComparator
+where
+    A: Array + Clone,
+    F: Fn(usize, usize) -> bool + Send + Sync + 'static,
+{
+    let l = l.logical_nulls().filter(|x| x.null_count() > 0);
+    let r = r.logical_nulls().filter(|x| x.null_count() > 0);
+
+    match (l, r) {
+        (None, None) => Box::new(eq),
+        (Some(l), None) => Box::new(move |i, j| {
+            if l.is_null(i) {
+                return false;
+            }
+            eq(i, j)
+        }),
+        (None, Some(r)) => Box::new(move |i, j| {
+            if r.is_null(j) {
+                return false;
+            }
+            eq(i, j)
+        }),
+        (Some(l), Some(r)) => Box::new(move |i, j| {
+            if l.is_null(i) || r.is_null(j) {
+                return false;
+            }
+            eq(i, j)
+        }),
+    }
+}
+
+fn eq_primitive<T: ArrowPrimitiveType>(left: &dyn Array, right: &dyn Array) -> DynEqComparator
+where
+    T::Native: ArrowNativeTypeOp,
+{
+    let left = left.as_primitive::<T>();
+    let right = right.as_primitive::<T>();
+    let l_values = left.values().clone();
+    let r_values = right.values().clone();
+    eq_impl(&left, &right, move |i, j| l_values[i] == r_values[j])
+}
+
+fn eq_boolean(left: &dyn Array, right: &dyn Array) -> DynEqComparator {
+    let left = left.as_boolean();
+    let right = right.as_boolean();
+
+    let l_values = left.values().clone();
+    let r_values = right.values().clone();
+
+    eq_impl(left, right, move |i, j| {
+        l_values.value(i) == r_values.value(j)
+    })
+}
+
+fn eq_bytes<T: ByteArrayType>(left: &dyn Array, right: &dyn Array) -> DynEqComparator {
+    let left = left.as_bytes::<T>();
+    let right = right.as_bytes::<T>();
+
+    let l = left.clone();
+    let r = right.clone();
+    eq_impl(left, right, move |i, j| {
+        let l: &[u8] = l.value(i).as_ref();
+        let r: &[u8] = r.value(j).as_ref();
+        l == r
+    })
+}
+
+fn compare_dict<K: ArrowDictionaryKeyType>(
+    left: &dyn Array,
+    right: &dyn Array,
+) -> Result<DynEqComparator, ArrowError> {
+    let left = left.as_dictionary::<K>();
+    let right = right.as_dictionary::<K>();
+
+    let eq = make_eq_comparator(left.values().as_ref(), right.values().as_ref())?;
+    let left_keys = left.keys().values().clone();
+    let right_keys = right.keys().values().clone();
+
+    let f = eq_impl(left, right, move |i, j| {
+        let l = left_keys[i].as_usize();
+        let r = right_keys[j].as_usize();
+        eq(l, r)
+    });
+    Ok(f)
+}
+
+fn eq_list<O: OffsetSizeTrait>(
+    left: &dyn Array,
+    right: &dyn Array,
+) -> Result<DynEqComparator, ArrowError> {
+    let left = left.as_list::<O>();
+    let right = right.as_list::<O>();
+
+    let eq = make_eq_comparator(left.values().as_ref(), right.values().as_ref())?;
+
+    let l_o = left.offsets().clone();
+    let r_o = right.offsets().clone();
+    let f = eq_impl(left, right, move |i, j| {
+        let l_end = l_o[i + 1].as_usize();
+        let l_start = l_o[i].as_usize();
+
+        let r_end = r_o[j + 1].as_usize();
+        let r_start = r_o[j].as_usize();
+
+        for (i, j) in (l_start..l_end).zip(r_start..r_end) {
+            if eq(i, j) {
+                continue;
+            }
+            return false;
+        }
+        (l_end - l_start) == (r_end - r_start)
+    });
+    Ok(f)
+}
+
+fn eq_fixed_list(left: &dyn Array, right: &dyn Array) -> Result<DynEqComparator, ArrowError> {
+    let left = left.as_fixed_size_list();
+    let right = right.as_fixed_size_list();
+    let eq = make_eq_comparator(left.values().as_ref(), right.values().as_ref())?;
+
+    let l_size = left.value_length().to_usize().unwrap();
+    let r_size = right.value_length().to_usize().unwrap();
+    let size_eq = l_size == r_size;
+
+    let f = eq_impl(left, right, move |i, j| {
+        let l_start = i * l_size;
+        let l_end = l_start + l_size;
+        let r_start = j * r_size;
+        let r_end = r_start + r_size;
+        for (i, j) in (l_start..l_end).zip(r_start..r_end) {
+            if eq(i, j) {
+                continue;
+            }
+            return false;
+        }
+        size_eq
+    });
+    Ok(f)
+}
+
+fn eq_struct(left: &dyn Array, right: &dyn Array) -> Result<DynEqComparator, ArrowError> {
+    let left = left.as_struct();
+    let right = right.as_struct();
+
+    if left.columns().len() != right.columns().len() {
+        return Err(ArrowError::InvalidArgumentError(
+            "Cannot compare StructArray with different number of columns".to_string(),
+        ));
+    }
+
+    let columns = left.columns().iter().zip(right.columns());
+    let comparators = columns
+        .map(|(l, r)| make_eq_comparator(l, r))
+        .collect::<Result<Vec<_>, _>>()?;
+
+    let f = eq_impl(left, right, move |i, j| {
+        for eq in &comparators {
+            if eq(i, j) {
+                continue;
+            }
+            return false;
+        }
+        return true;
+    });
+    Ok(f)
+}
+
+pub fn make_eq_comparator(
+    left: &dyn Array,
+    right: &dyn Array,
+) -> Result<DynEqComparator, ArrowError> {
+    use arrow::{datatypes as arrow_schema, datatypes::DataType::*};
+
+    macro_rules! primitive_helper {
+        ($t:ty, $left:expr, $right:expr) => {
+            Ok(eq_primitive::<$t>($left, $right))
+        };
+    }
+    downcast_primitive! {
+        left.data_type(), right.data_type() => (primitive_helper, left, right),
+        (Boolean, Boolean) => Ok(eq_boolean(left, right)),
+        (Utf8, Utf8) => Ok(eq_bytes::<Utf8Type>(left, right)),
+        (LargeUtf8, LargeUtf8) => Ok(eq_bytes::<LargeUtf8Type>(left, right)),
+        (Binary, Binary) => Ok(eq_bytes::<BinaryType>(left, right)),
+        (LargeBinary, LargeBinary) => Ok(eq_bytes::<LargeBinaryType>(left, right)),
+        (FixedSizeBinary(_), FixedSizeBinary(_)) => {
+            let left = left.as_fixed_size_binary();
+            let right = right.as_fixed_size_binary();
+
+            let l = left.clone();
+            let r = right.clone();
+            Ok(eq_impl(left, right, move |i, j| {
+                l.value(i).eq(r.value(j))
+            }))
+        },
+        (List(_), List(_)) => eq_list::<i32>(left, right),
+        (LargeList(_), LargeList(_)) => eq_list::<i64>(left, right),
+        (FixedSizeList(_, _), FixedSizeList(_, _)) => eq_fixed_list(left, right),
+        (Struct(_), Struct(_)) => eq_struct(left, right),
+        (Dictionary(l_key, _), Dictionary(r_key, _)) => {
+             macro_rules! dict_helper {
+                ($t:ty, $left:expr, $right:expr) => {
+                     compare_dict::<$t>($left, $right)
+                 };
+             }
+            downcast_integer! {
+                 l_key.as_ref(), r_key.as_ref() => (dict_helper, left, right),
+                 _ => unreachable!()
+             }
+        },
+        (lhs, rhs) => Err(ArrowError::InvalidArgumentError(match lhs == rhs {
+            true => format!("The data type type {lhs:?} has no natural order"),
+            false => "Can't compare arrays of different types".to_string(),
+        }))
+    }
+}
+
+#[cfg(test)]
+pub mod tests {
+    use std::sync::Arc;
+
+    use arrow::{
+        array::builder::{Int32Builder, ListBuilder},
+        buffer::{NullBuffer, OffsetBuffer},
+        datatypes::{i256, DataType, Field, Fields},
+    };
+
+    use super::*;
+
+    #[test]
+    fn test_fixed_size_binary() {
+        let items = vec![vec![1u8], vec![2u8]];
+        let array = FixedSizeBinaryArray::try_from_iter(items.into_iter()).unwrap();
+
+        let eq = make_eq_comparator(&array, &array).unwrap();
+
+        assert_eq!(false, eq(0, 1));
+    }
+
+    #[test]
+    fn test_fixed_size_binary_fixed_size_binary() {
+        let items = vec![vec![1u8]];
+        let array1 = FixedSizeBinaryArray::try_from_iter(items.into_iter()).unwrap();
+        let items = vec![vec![2u8]];
+        let array2 = FixedSizeBinaryArray::try_from_iter(items.into_iter()).unwrap();
+
+        let eq = make_eq_comparator(&array1, &array2).unwrap();
+
+        assert_eq!(false, eq(0, 0));
+    }
+
+    #[test]
+    fn test_i32() {
+        let array = Int32Array::from(vec![1, 2]);
+
+        let eq = make_eq_comparator(&array, &array).unwrap();
+
+        assert_eq!(false, (eq)(0, 1));
+    }
+
+    #[test]
+    fn test_i32_i32() {
+        let array1 = Int32Array::from(vec![1]);
+        let array2 = Int32Array::from(vec![2]);
+
+        let eq = make_eq_comparator(&array1, &array2).unwrap();
+
+        assert_eq!(false, eq(0, 0));
+    }
+
+    #[test]
+    fn test_f64() {
+        let array = Float64Array::from(vec![1.0, 2.0]);
+
+        let eq = make_eq_comparator(&array, &array).unwrap();
+
+        assert_eq!(false, eq(0, 1));
+    }
+
+    #[test]
+    fn test_f64_nan() {
+        let array = Float64Array::from(vec![1.0, f64::NAN]);
+
+        let eq = make_eq_comparator(&array, &array).unwrap();
+
+        assert_eq!(true, eq(0, 0));
+        assert_eq!(false, eq(0, 1));
+        assert_eq!(false, eq(1, 1)); // NaN != NaN
+    }
+
+    #[test]
+    fn test_f64_zeros() {
+        let array = Float64Array::from(vec![-0.0, 0.0]);
+
+        let eq = make_eq_comparator(&array, &array).unwrap();
+
+        assert_eq!(true, eq(0, 1)); // -0.0 == 0.0
+        assert_eq!(true, eq(1, 0));
+    }
+
+    #[test]
+    fn test_interval_day_time() {
+        let array = IntervalDayTimeArray::from(vec![
+            // 0 days, 1 second
+            IntervalDayTimeType::make_value(0, 1000),
+            // 1 day, 2 milliseconds
+            IntervalDayTimeType::make_value(1, 2),
+            // 90M milliseconds (which is more than is in 1 day)
+            IntervalDayTimeType::make_value(0, 90_000_000),
+        ]);
+
+        let eq = make_eq_comparator(&array, &array).unwrap();
+
+        assert_eq!(false, eq(0, 1));
+        assert_eq!(false, eq(1, 0));
+
+        // somewhat confusingly, while 90M milliseconds is more than 1 day,
+        // it will compare less as the comparison is done on the underlying
+        // values not field by field
+        assert_eq!(false, eq(1, 2));
+        assert_eq!(false, eq(2, 1));
+    }
+
+    #[test]
+    fn test_interval_year_month() {
+        let array = IntervalYearMonthArray::from(vec![
+            // 1 year, 0 months
+            IntervalYearMonthType::make_value(1, 0),
+            // 0 years, 13 months
+            IntervalYearMonthType::make_value(0, 13),
+            // 1 year, 1 month
+            IntervalYearMonthType::make_value(1, 1),
+        ]);
+
+        let eq = make_eq_comparator(&array, &array).unwrap();
+
+        assert_eq!(false, eq(0, 1));
+        assert_eq!(false, eq(1, 0));
+
+        // the underlying representation is months, so both quantities are the same
+        assert_eq!(true, eq(1, 2));
+        assert_eq!(true, eq(2, 1));
+    }
+
+    #[test]
+    fn test_interval_month_day_nano() {
+        let array = IntervalMonthDayNanoArray::from(vec![
+            // 100 days
+            IntervalMonthDayNanoType::make_value(0, 100, 0),
+            // 1 month
+            IntervalMonthDayNanoType::make_value(1, 0, 0),
+            // 100 day, 1 nanoseconds
+            IntervalMonthDayNanoType::make_value(0, 100, 2),
+        ]);
+
+        let eq = make_eq_comparator(&array, &array).unwrap();
+
+        assert_eq!(false, eq(0, 1));
+        assert_eq!(false, eq(1, 0));
+
+        // somewhat confusingly, while 100 days is more than 1 month in all cases
+        // it will compare less as the comparison is done on the underlying
+        // values not field by field
+        assert_eq!(false, eq(1, 2));
+        assert_eq!(false, eq(2, 1));
+    }
+
+    #[test]
+    fn test_decimal() {
+        let array = vec![Some(5_i128), Some(2_i128), Some(3_i128)]
+            .into_iter()
+            .collect::<Decimal128Array>()
+            .with_precision_and_scale(23, 6)
+            .unwrap();
+
+        let eq = make_eq_comparator(&array, &array).unwrap();
+        assert_eq!(false, eq(1, 0));
+        assert_eq!(false, eq(0, 2));
+    }
+
+    #[test]
+    fn test_decimali256() {
+        let array = vec![
+            Some(i256::from_i128(5_i128)),
+            Some(i256::from_i128(2_i128)),
+            Some(i256::from_i128(3_i128)),
+        ]
+        .into_iter()
+        .collect::<Decimal256Array>()
+        .with_precision_and_scale(53, 6)
+        .unwrap();
+
+        let eq = make_eq_comparator(&array, &array).unwrap();
+        assert_eq!(false, eq(1, 0));
+        assert_eq!(false, eq(0, 2));
+    }
+
+    #[test]
+    fn test_dict() {
+        let data = vec!["a", "b", "c", "a", "a", "c", "c"];
+        let array = data.into_iter().collect::<DictionaryArray<Int16Type>>();
+
+        let eq = make_eq_comparator(&array, &array).unwrap();
+
+        assert_eq!(false, eq(0, 1));
+        assert_eq!(true, eq(3, 4));
+        assert_eq!(false, eq(2, 3));
+    }
+
+    #[test]
+    fn test_multiple_dict() {
+        let d1 = vec!["a", "b", "c", "d"];
+        let a1 = d1.into_iter().collect::<DictionaryArray<Int16Type>>();
+        let d2 = vec!["e", "f", "g", "a"];
+        let a2 = d2.into_iter().collect::<DictionaryArray<Int16Type>>();
+
+        let eq = make_eq_comparator(&a1, &a2).unwrap();
+
+        assert_eq!(false, eq(0, 0));
+        assert_eq!(true, eq(0, 3));
+        assert_eq!(false, eq(1, 3));
+    }
+
+    #[test]
+    fn test_primitive_dict() {
+        let values = Int32Array::from(vec![1_i32, 0, 2, 5]);
+        let keys = Int8Array::from_iter_values([0, 0, 1, 3]);
+        let array1 = DictionaryArray::new(keys, Arc::new(values));
+
+        let values = Int32Array::from(vec![2_i32, 3, 4, 5]);
+        let keys = Int8Array::from_iter_values([0, 1, 1, 3]);
+        let array2 = DictionaryArray::new(keys, Arc::new(values));
+
+        let eq = make_eq_comparator(&array1, &array2).unwrap();
+
+        assert_eq!(false, eq(0, 0));
+        assert_eq!(false, eq(0, 3));
+        assert_eq!(true, eq(3, 3));
+        assert_eq!(false, eq(3, 1));
+        assert_eq!(false, eq(3, 2));
+    }
+
+    #[test]
+    fn test_float_dict() {
+        let values = Float32Array::from(vec![1.0, 0.5, 2.1, 5.5]);
+        let keys = Int8Array::from_iter_values([0, 0, 1, 3]);
+        let array1 = DictionaryArray::try_new(keys, Arc::new(values)).unwrap();
+
+        let values = Float32Array::from(vec![1.2, 3.2, 4.0, 5.5]);
+        let keys = Int8Array::from_iter_values([0, 1, 1, 3]);
+        let array2 = DictionaryArray::new(keys, Arc::new(values));
+
+        let eq = make_eq_comparator(&array1, &array2).unwrap();
+
+        assert_eq!(false, eq(0, 0));
+        assert_eq!(false, eq(0, 3));
+        assert_eq!(true, eq(3, 3));
+        assert_eq!(false, eq(3, 1));
+        assert_eq!(false, eq(3, 2));
+    }
+
+    #[test]
+    fn test_timestamp_dict() {
+        let values = TimestampSecondArray::from(vec![1, 0, 2, 5]);
+        let keys = Int8Array::from_iter_values([0, 0, 1, 3]);
+        let array1 = DictionaryArray::new(keys, Arc::new(values));
+
+        let values = TimestampSecondArray::from(vec![2, 3, 4, 5]);
+        let keys = Int8Array::from_iter_values([0, 1, 1, 3]);
+        let array2 = DictionaryArray::new(keys, Arc::new(values));
+
+        let eq = make_eq_comparator(&array1, &array2).unwrap();
+
+        assert_eq!(false, eq(0, 0));
+        assert_eq!(false, eq(0, 3));
+        assert_eq!(true, eq(3, 3));
+        assert_eq!(false, eq(3, 1));
+        assert_eq!(false, eq(3, 2));
+    }
+
+    #[test]
+    fn test_duration_dict() {
+        let values = DurationSecondArray::from(vec![1, 0, 2, 5]);
+        let keys = Int8Array::from_iter_values([0, 0, 1, 3]);
+        let array1 = DictionaryArray::new(keys, Arc::new(values));
+
+        let values = DurationSecondArray::from(vec![2, 3, 4, 5]);
+        let keys = Int8Array::from_iter_values([0, 1, 1, 3]);
+        let array2 = DictionaryArray::new(keys, Arc::new(values));
+
+        let eq = make_eq_comparator(&array1, &array2).unwrap();
+
+        assert_eq!(false, eq(0, 0));
+        assert_eq!(false, eq(0, 3));
+        assert_eq!(true, eq(3, 3));
+        assert_eq!(false, eq(3, 1));
+        assert_eq!(false, eq(3, 2));
+    }
+
+    #[test]
+    fn test_decimal_dict() {
+        let values = Decimal128Array::from(vec![1, 0, 2, 5]);
+        let keys = Int8Array::from_iter_values([0, 0, 1, 3]);
+        let array1 = DictionaryArray::new(keys, Arc::new(values));
+
+        let values = Decimal128Array::from(vec![2, 3, 4, 5]);
+        let keys = Int8Array::from_iter_values([0, 1, 1, 3]);
+        let array2 = DictionaryArray::new(keys, Arc::new(values));
+
+        let eq = make_eq_comparator(&array1, &array2).unwrap();
+
+        assert_eq!(false, eq(0, 0));
+        assert_eq!(false, eq(0, 3));
+        assert_eq!(true, eq(3, 3));
+        assert_eq!(false, eq(3, 1));
+        assert_eq!(false, eq(3, 2));
+    }
+
+    #[test]
+    fn test_decimal256_dict() {
+        let values = Decimal256Array::from(vec![
+            i256::from_i128(1),
+            i256::from_i128(0),
+            i256::from_i128(2),
+            i256::from_i128(5),
+        ]);
+        let keys = Int8Array::from_iter_values([0, 0, 1, 3]);
+        let array1 = DictionaryArray::new(keys, Arc::new(values));
+
+        let values = Decimal256Array::from(vec![
+            i256::from_i128(2),
+            i256::from_i128(3),
+            i256::from_i128(4),
+            i256::from_i128(5),
+        ]);
+        let keys = Int8Array::from_iter_values([0, 1, 1, 3]);
+        let array2 = DictionaryArray::new(keys, Arc::new(values));
+
+        let eq = make_eq_comparator(&array1, &array2).unwrap();
+
+        assert_eq!(false, eq(0, 0));
+        assert_eq!(false, eq(0, 3));
+        assert_eq!(true, eq(3, 3));
+        assert_eq!(false, eq(3, 1));
+        assert_eq!(false, eq(3, 2));
+    }
+
+    fn test_bytes_impl<T: ByteArrayType>() {
+        let offsets = OffsetBuffer::from_lengths([3, 3, 1]);
+        let a = GenericByteArray::<T>::new(offsets, b"abcdefa".into(), None);
+        let eq = make_eq_comparator(&a, &a).unwrap();
+
+        assert_eq!(false, eq(0, 1));
+        assert_eq!(false, eq(0, 2));
+        assert_eq!(true, eq(1, 1));
+    }
+
+    #[test]
+    fn test_bytes() {
+        test_bytes_impl::<Utf8Type>();
+        test_bytes_impl::<LargeUtf8Type>();
+        test_bytes_impl::<BinaryType>();
+        test_bytes_impl::<LargeBinaryType>();
+    }
+
+    #[test]
+    fn test_lists() {
+        let mut a = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
+        a.extend([
+            Some(vec![Some(vec![Some(1), Some(2), None]), Some(vec![None])]),
+            Some(vec![
+                Some(vec![Some(1), Some(2), Some(3)]),
+                Some(vec![Some(1)]),
+            ]),
+            Some(vec![]),
+            None,
+            Some(vec![Some(vec![Some(1), Some(2)]), Some(vec![Some(1)])]),
+        ]);
+        let a = a.finish();
+        let mut b = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
+        b.extend([
+            Some(vec![Some(vec![Some(1), Some(2), None]), Some(vec![None])]),
+            Some(vec![
+                Some(vec![Some(1), Some(2), None]),
+                Some(vec![Some(1)]),
+            ]),
+            Some(vec![
+                Some(vec![Some(1), Some(2), Some(3), Some(4)]),
+                Some(vec![Some(1)]),
+            ]),
+            None,
+            Some(vec![Some(vec![Some(1), Some(2)]), Some(vec![Some(1)])]),
+        ]);
+        let b = b.finish();
+
+        let eq = make_eq_comparator(&a, &b).unwrap();
+        assert_eq!(eq(0, 0), false); // lists contains null never equal
+        assert_eq!(eq(0, 1), false);
+        assert_eq!(eq(0, 2), false);
+        assert_eq!(eq(1, 2), false);
+        assert_eq!(eq(1, 3), false);
+        assert_eq!(eq(2, 0), false);
+        assert_eq!(eq(4, 4), true);
+    }
+
+    #[test]
+    fn test_struct() {
+        let fields = Fields::from(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new_list("b", Field::new("item", DataType::Int32, true), true),
+        ]);
+
+        let a = Int32Array::from(vec![Some(1), Some(2), None, None]);
+        let mut b = ListBuilder::new(Int32Builder::new());
+        b.extend([Some(vec![Some(1), Some(2)]), Some(vec![None]), None, None]);
+        let b = b.finish();
+
+        let nulls = Some(NullBuffer::from_iter([true, true, true, false]));
+        let values = vec![Arc::new(a) as _, Arc::new(b) as _];
+        let s1 = StructArray::new(fields.clone(), values, nulls);
+
+        let a = Int32Array::from(vec![None, Some(2), None]);
+        let mut b = ListBuilder::new(Int32Builder::new());
+        b.extend([None, None, Some(vec![])]);
+        let b = b.finish();
+
+        let values = vec![Arc::new(a) as _, Arc::new(b) as _];
+        let s2 = StructArray::new(fields.clone(), values, None);
+
+        let eq = make_eq_comparator(&s1, &s2).unwrap();
+        assert_eq!(eq(0, 1), false); // (1, [1, 2]) eq (2, None)
+        assert_eq!(eq(0, 0), false); // (1, [1, 2]) eq (None, None)
+        assert_eq!(eq(1, 1), false); // (2, [None]) eq (2, None)
+        assert_eq!(eq(2, 2), false); // (None, None) eq (None, [])
+        assert_eq!(eq(3, 0), false); // None eq (None, [])
+        assert_eq!(eq(2, 0), false); // (None, None) eq (None, None)
+        assert_eq!(eq(3, 0), false); // None eq (None, None)
+    }
+}
diff --git a/native-engine/datafusion-ext-plans/src/common/mod.rs b/native-engine/datafusion-ext-plans/src/common/mod.rs
index 41436dd..2dee86c 100644
--- a/native-engine/datafusion-ext-plans/src/common/mod.rs
+++ b/native-engine/datafusion-ext-plans/src/common/mod.rs
@@ -17,4 +17,6 @@
 pub mod cached_exprs_evaluator;
 pub mod column_pruning;
 pub mod ipc_compression;
+pub mod make_eq_comparator;
 pub mod output;
+pub mod timer_helper;
diff --git a/native-engine/datafusion-ext-plans/src/common/output.rs b/native-engine/datafusion-ext-plans/src/common/output.rs
index d888026..73ba510 100644
--- a/native-engine/datafusion-ext-plans/src/common/output.rs
+++ b/native-engine/datafusion-ext-plans/src/common/output.rs
@@ -14,7 +14,6 @@
 
 use std::{
     future::Future,
-    io::{Cursor, Write},
     panic::AssertUnwindSafe,
     sync::{Arc, Weak},
 };
@@ -33,7 +32,7 @@
 };
 use datafusion_ext_commons::{
     df_execution_err,
-    io::{read_one_batch, write_one_batch},
+    io::{read_one_batch, recover_named_batch, write_one_batch},
 };
 use futures::{FutureExt, StreamExt};
 use once_cell::sync::OnceCell;
@@ -196,16 +195,15 @@
                     // write all batches to spill, releasing all holding memory
                     while let Some(batch) = stream.next().await.transpose()? {
                         let _timer = baseline_metrics.elapsed_compute().timer();
-                        let mut buf = vec![];
-                        write_one_batch(&batch, &mut Cursor::new(&mut buf))?;
-                        spill_writer.write_all(&buf)?;
+                        write_one_batch(batch.num_rows(), batch.columns(), &mut spill_writer)?;
                     }
                     let mut timer = baseline_metrics.elapsed_compute().timer();
                     drop(spill_writer);
 
                     // read all batches from spill and output
                     let mut spill_reader = spill.get_compressed_reader();
-                    while let Some(batch) = read_one_batch(&mut spill_reader, &schema)? {
+                    while let Some((num_rows, cols)) = read_one_batch(&mut spill_reader)? {
+                        let batch = recover_named_batch(num_rows, &cols, schema.clone())?;
                         sender.send(Ok(batch), Some(&mut timer)).await;
                     }
                     return Ok(());
diff --git a/native-engine/datafusion-ext-plans/src/common/timer_helper.rs b/native-engine/datafusion-ext-plans/src/common/timer_helper.rs
new file mode 100644
index 0000000..7ee407e
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/common/timer_helper.rs
@@ -0,0 +1,37 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{future::Future, time::Instant};
+
+use datafusion::physical_plan::metrics::Time;
+use futures::{future::BoxFuture, FutureExt};
+
+pub trait TimerHelper {
+    fn with_timer<T>(&self, f: impl FnOnce() -> T) -> T;
+    fn with_timer_async<'a, T>(&'a self, f: impl Future<Output = T> + Send + 'a) -> BoxFuture<T>;
+}
+
+impl TimerHelper for Time {
+    fn with_timer<T>(&self, f: impl FnOnce() -> T) -> T {
+        let _timer = self.timer();
+        f()
+    }
+
+    fn with_timer_async<'a, T>(&'a self, f: impl Future<Output = T> + Send + 'a) -> BoxFuture<T> {
+        let time = self.clone();
+        let start_time = Instant::now();
+        f.inspect(move |_| time.add_duration(start_time.elapsed()))
+            .boxed()
+    }
+}
diff --git a/native-engine/datafusion-ext-plans/src/generate/spark_udtf_wrapper.rs b/native-engine/datafusion-ext-plans/src/generate/spark_udtf_wrapper.rs
index ac5e995..1a0ba12 100644
--- a/native-engine/datafusion-ext-plans/src/generate/spark_udtf_wrapper.rs
+++ b/native-engine/datafusion-ext-plans/src/generate/spark_udtf_wrapper.rs
@@ -18,7 +18,7 @@
 };
 
 use arrow::{
-    array::{make_array, Array, ArrayRef, AsArray, Int32Array, RecordBatch, RecordBatchOptions},
+    array::{make_array, ArrayRef, AsArray, Int32Array, RecordBatch, RecordBatchOptions},
     datatypes::{DataType, Field, Schema, SchemaRef},
     ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema},
 };
diff --git a/native-engine/datafusion-ext-plans/src/hash_join_exec.rs b/native-engine/datafusion-ext-plans/src/hash_join_exec.rs
deleted file mode 100644
index 14ca9b0..0000000
--- a/native-engine/datafusion-ext-plans/src/hash_join_exec.rs
+++ /dev/null
@@ -1,172 +0,0 @@
-// Copyright 2022 The Blaze Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-use std::{
-    any::Any,
-    fmt::{Debug, Formatter},
-    sync::Arc,
-    task::Poll,
-    time::Duration,
-};
-
-use arrow::datatypes::SchemaRef;
-use datafusion::{
-    common::{JoinSide, Result},
-    execution::{SendableRecordBatchStream, TaskContext},
-    physical_expr::{Partitioning, PhysicalSortExpr},
-    physical_plan::{
-        joins::utils::JoinOn,
-        metrics::{MetricValue, MetricsSet},
-        stream::RecordBatchStreamAdapter,
-        DisplayAs, DisplayFormatType, ExecutionPlan,
-    },
-};
-use datafusion_ext_commons::downcast_any;
-use futures::{stream::poll_fn, StreamExt};
-
-use crate::{
-    broadcast_join_build_hash_map_exec::BroadcastJoinBuildHashMapExec,
-    broadcast_join_exec::BroadcastJoinExec, joins::join_utils::JoinType,
-};
-
-#[derive(Debug)]
-pub struct HashJoinExec {
-    left: Arc<dyn ExecutionPlan>,
-    right: Arc<dyn ExecutionPlan>,
-    wrapped_bhj: Arc<dyn ExecutionPlan>,
-}
-
-impl HashJoinExec {
-    pub fn try_new(
-        schema: SchemaRef,
-        left: Arc<dyn ExecutionPlan>,
-        right: Arc<dyn ExecutionPlan>,
-        on: JoinOn,
-        join_type: JoinType,
-        build_side: JoinSide,
-    ) -> Result<Self> {
-        let wrapped_build_map: Arc<dyn ExecutionPlan> = Arc::new(match build_side {
-            JoinSide::Left => BroadcastJoinBuildHashMapExec::new(
-                left.clone(),
-                on.iter().map(|(l, _)| (l.clone())).collect(),
-            ),
-            JoinSide::Right => BroadcastJoinBuildHashMapExec::new(
-                right.clone(),
-                on.iter().map(|(_, r)| (r.clone())).collect(),
-            ),
-        });
-        let wrapped_bhj: Arc<dyn ExecutionPlan> = Arc::new(BroadcastJoinExec::try_new(
-            schema,
-            match build_side {
-                JoinSide::Left => wrapped_build_map.clone(),
-                JoinSide::Right => left.clone(),
-            },
-            match build_side {
-                JoinSide::Left => right.clone(),
-                JoinSide::Right => wrapped_build_map.clone(),
-            },
-            on,
-            join_type,
-            build_side,
-            None,
-        )?);
-        Ok(Self {
-            left,
-            right,
-            wrapped_bhj,
-        })
-    }
-}
-
-impl DisplayAs for HashJoinExec {
-    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
-        write!(f, "HashJoin: wrapped ")?;
-        self.wrapped_bhj.fmt_as(t, f)?;
-        Ok(())
-    }
-}
-
-impl ExecutionPlan for HashJoinExec {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn schema(&self) -> SchemaRef {
-        self.wrapped_bhj.schema()
-    }
-
-    fn output_partitioning(&self) -> Partitioning {
-        self.wrapped_bhj.output_partitioning()
-    }
-
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.wrapped_bhj.output_ordering()
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.left.clone(), self.right.clone()]
-    }
-
-    fn with_new_children(
-        self: Arc<Self>,
-        children: Vec<Arc<dyn ExecutionPlan>>,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        let wrapped_bhj = downcast_any!(self.wrapped_bhj, BroadcastJoinExec)?;
-        Ok(Arc::new(Self::try_new(
-            self.schema(),
-            children[0].clone(),
-            children[1].clone(),
-            wrapped_bhj.on().clone(),
-            wrapped_bhj.join_type(),
-            wrapped_bhj.broadcast_side(),
-        )?))
-    }
-
-    fn execute(
-        &self,
-        partition: usize,
-        context: Arc<TaskContext>,
-    ) -> Result<SendableRecordBatchStream> {
-        let wrapped_bhj = self.wrapped_bhj.clone();
-        let stream = Box::pin(self.wrapped_bhj.execute(partition, context)?.chain(poll_fn(
-            move |_| {
-                // add build map time to join time
-                let bhj = downcast_any!(&wrapped_bhj, BroadcastJoinExec).unwrap();
-                let build_map = match bhj.broadcast_side() {
-                    JoinSide::Left => &bhj.children()[0],
-                    JoinSide::Right => &bhj.children()[1],
-                };
-                let build_map_time = build_map
-                    .metrics()
-                    .unwrap()
-                    .elapsed_compute()
-                    .unwrap_or_default();
-                for metric in bhj.metrics().unwrap().iter() {
-                    if let MetricValue::ElapsedCompute(time) = metric.value() {
-                        time.add_duration(Duration::from_nanos(build_map_time as u64))
-                    }
-                }
-                Poll::Ready(None)
-            },
-        )));
-        Ok(Box::pin(RecordBatchStreamAdapter::new(
-            self.schema(),
-            stream,
-        )))
-    }
-
-    fn metrics(&self) -> Option<MetricsSet> {
-        self.wrapped_bhj.metrics()
-    }
-}
diff --git a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs
index 9688e5a..7a77da1 100644
--- a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs
@@ -17,14 +17,20 @@
     fmt::{Debug, Formatter},
     fs::File,
     io::{BufReader, Read, Seek, SeekFrom},
-    sync::Arc,
+    sync::{
+        atomic::{AtomicUsize, Ordering::SeqCst},
+        Arc,
+    },
 };
 
-use arrow::datatypes::SchemaRef;
+use arrow::{
+    array::{Array, ArrayRef},
+    datatypes::SchemaRef,
+};
 use async_trait::async_trait;
 use blaze_jni_bridge::{
-    jni_call, jni_call_static, jni_get_object_class, jni_get_string, jni_new_direct_byte_buffer,
-    jni_new_global_ref, jni_new_string,
+    jni_call, jni_call_static, jni_get_byte_array_region, jni_get_direct_buffer, jni_get_string,
+    jni_new_direct_byte_buffer, jni_new_global_ref, jni_new_string,
 };
 use datafusion::{
     error::{DataFusionError, Result},
@@ -39,7 +45,8 @@
     },
 };
 use datafusion_ext_commons::{
-    array_size::ArraySize, df_execution_err, streams::coalesce_stream::CoalesceInput,
+    array_size::ArraySize, batch_size, df_execution_err, io::recover_named_batch,
+    streams::coalesce_stream::coalesce_arrays_unchecked, suggested_output_batch_mem_size,
 };
 use futures::{stream::once, TryStreamExt};
 use jni::objects::{GlobalRef, JObject};
@@ -119,27 +126,30 @@
         let elapsed_compute = baseline_metrics.elapsed_compute().clone();
         let _timer = elapsed_compute.timer();
 
-        let segments_provider = jni_call_static!(
+        let blocks_provider = jni_call_static!(
             JniBridge.getResource(
                 jni_new_string!(&self.ipc_provider_resource_id)?.as_obj()
             ) -> JObject
         )?;
-        let segments_local =
-            jni_call!(ScalaFunction0(segments_provider.as_obj()).apply() -> JObject)?;
-        let segments = jni_new_global_ref!(segments_local.as_obj())?;
+        assert!(!blocks_provider.as_obj().is_null());
 
+        let blocks_local = jni_call!(ScalaFunction0(blocks_provider.as_obj()).apply() -> JObject)?;
+        assert!(!blocks_local.as_obj().is_null());
+
+        let blocks = jni_new_global_ref!(blocks_local.as_obj())?;
         let ipc_stream = Box::pin(RecordBatchStreamAdapter::new(
             self.schema(),
             once(read_ipc(
+                partition,
                 context.clone(),
                 self.schema(),
-                segments,
+                blocks,
                 baseline_metrics.clone(),
                 size_counter,
             ))
             .try_flatten(),
         ));
-        Ok(context.coalesce_with_default_batch_size(ipc_stream, &baseline_metrics)?)
+        Ok(ipc_stream)
     }
 
     fn metrics(&self) -> Option<MetricsSet> {
@@ -152,89 +162,140 @@
 }
 
 pub async fn read_ipc(
+    partition_id: usize,
     context: Arc<TaskContext>,
     schema: SchemaRef,
-    segments: GlobalRef,
+    blocks: GlobalRef,
     baseline_metrics: BaselineMetrics,
     size_counter: Count,
 ) -> Result<SendableRecordBatchStream> {
+    log::info!("[partition={partition_id}] start ipc reading");
     context.output_with_sender("IpcReader", schema.clone(), move |sender| async move {
         let mut timer = baseline_metrics.elapsed_compute().timer();
+        let batch_size = batch_size();
+        let staging_cols: Arc<Mutex<Vec<Vec<ArrayRef>>>> = Arc::new(Mutex::new(vec![]));
+        let staging_num_rows = AtomicUsize::new(0);
+        let staging_mem_size = AtomicUsize::new(0);
+
         loop {
-            // get next segment
-            let segments = segments.clone();
+            // get next block
+            let blocks = blocks.clone();
             let next = tokio::task::spawn_blocking(move || {
-                if !jni_call!(ScalaIterator(segments.as_obj()).hasNext() -> bool)? {
+                if !jni_call!(ScalaIterator(blocks.as_obj()).hasNext() -> bool)? {
                     return Ok::<_, DataFusionError>(None);
                 }
-                let segment = jni_new_global_ref!(
-                    jni_call!(ScalaIterator(segments.as_obj()).next() -> JObject)?.as_obj()
+                let block = jni_new_global_ref!(
+                    jni_call!(ScalaIterator(blocks.as_obj()).next() -> JObject)?.as_obj()
                 )?;
-                let segment_class = jni_get_object_class!(segment.as_obj())?;
-                let segment_classname_obj =
-                    jni_call!(Class(segment_class.as_obj()).getName() -> JObject)?;
-                let segment_classname = jni_get_string!(segment_classname_obj.as_obj().into())?;
-                Ok(Some((segment_classname, segment)))
+                Ok(Some(block))
             })
             .await
             .or_else(|err| df_execution_err!("{err}"))??;
 
             // get ipc reader
             let reader = Arc::new(Mutex::new(match next {
-                Some((segment_classname, segment)) => {
-                    if segment_classname == "org.apache.spark.storage.FileSegment" {
-                        get_file_segment_reader(schema.clone(), segment.as_obj())?
-                    } else {
-                        get_channel_reader(schema.clone(), segment.as_obj())?
-                    }
+                Some(block) if jni_call!(BlazeBlockObject(block.as_obj()).hasFileSegment() -> bool)? => {
+                    get_file_reader(block.as_obj())?
                 }
+                Some(block) if jni_call!(BlazeBlockObject(block.as_obj()).hasByteBuffer() -> bool)? => {
+                    get_byte_buffer_reader(block.as_obj())?
+                }
+                Some(block) => get_channel_reader(block.as_obj())?,
                 None => break,
             }));
 
-            while let Some(batch) = {
-                let reader = reader.clone();
-                tokio::task::spawn_blocking(move || reader.lock().read_batch())
+            while let Some((num_rows, cols)) = {
+                let reader_cloned = reader.clone();
+                tokio::task::spawn_blocking(move || reader_cloned.clone().lock().read_batch())
                     .await
                     .or_else(|err| df_execution_err!("{err}"))??
             } {
-                size_counter.add(batch.get_array_mem_size());
-                baseline_metrics.record_output(batch.num_rows());
-                sender.send(Ok(batch), Some(&mut timer)).await;
+
+                let (cur_staging_num_rows, cur_staging_mem_size) = {
+                    let staging_cols_cloned = staging_cols.clone();
+                    let mut staging_cols = staging_cols_cloned.lock();
+                    let mut cols_mem_size = 0;
+                    staging_cols.resize_with(cols.len(), || vec![]);
+                    for (col_idx, col) in cols.into_iter().enumerate() {
+                        cols_mem_size += col.get_array_mem_size();
+                        staging_cols[col_idx].push(col);
+                    }
+                    drop(staging_cols);
+                    staging_num_rows.fetch_add(num_rows, SeqCst);
+                    staging_mem_size.fetch_add(cols_mem_size, SeqCst);
+                    (staging_num_rows.load(SeqCst), staging_mem_size.load(SeqCst))
+                };
+
+                if cur_staging_num_rows >= batch_size
+                    || cur_staging_mem_size >= suggested_output_batch_mem_size()
+                {
+                    let coalesced_cols = std::mem::take(&mut *staging_cols.clone().lock())
+                        .into_iter()
+                        .map(|cols| coalesce_arrays_unchecked(cols[0].data_type(), &cols))
+                        .collect::<Vec<_>>();
+                    let batch = recover_named_batch(
+                        cur_staging_num_rows,
+                        &coalesced_cols,
+                        schema.clone(),
+                    )?;
+                    staging_num_rows.store(0, SeqCst);
+                    staging_mem_size.store(0, SeqCst);
+                    size_counter.add(batch.get_array_mem_size());
+                    baseline_metrics.record_output(batch.num_rows());
+                    sender.send(Ok(batch), Some(&mut timer)).await;
+                }
             }
         }
+
+        let cur_staging_num_rows = staging_num_rows.load(SeqCst);
+        if cur_staging_num_rows > 0 {
+            let coalesced_cols = std::mem::take(&mut *staging_cols.clone().lock())
+                .into_iter()
+                .map(|cols| coalesce_arrays_unchecked(cols[0].data_type(), &cols))
+                .collect::<Vec<_>>();
+            let batch = recover_named_batch(
+                cur_staging_num_rows,
+                &coalesced_cols,
+                schema.clone(),
+            )?;
+            size_counter.add(batch.get_array_mem_size());
+            baseline_metrics.record_output(batch.num_rows());
+            sender.send(Ok(batch), Some(&mut timer)).await;
+        }
         Ok(())
     })
 }
 
-fn get_channel_reader(
-    schema: SchemaRef,
-    channel: JObject,
-) -> Result<IpcCompressionReader<Box<dyn Read + Send>>> {
-    let global_ref = jni_new_global_ref!(channel)?;
-    let channel_reader = ReadableByteChannelReader::new(global_ref);
-
-    Ok(IpcCompressionReader::new(
-        Box::new(BufReader::with_capacity(65536, channel_reader)),
-        schema,
-    ))
+fn get_channel_reader(block: JObject) -> Result<IpcCompressionReader<Box<dyn Read + Send>>> {
+    let channel_reader = ReadableByteChannelReader::try_new(block)?;
+    Ok(IpcCompressionReader::new(Box::new(
+        BufReader::with_capacity(65536, channel_reader),
+    )))
 }
 
-fn get_file_segment_reader(
-    schema: SchemaRef,
-    file_segment: JObject,
-) -> Result<IpcCompressionReader<Box<dyn Read + Send>>> {
-    let file = jni_call!(SparkFileSegment(file_segment).file() -> JObject)?;
-    let path = jni_call!(JavaFile(file.as_obj()).getPath() -> JObject)?;
+fn get_file_reader(block: JObject) -> Result<IpcCompressionReader<Box<dyn Read + Send>>> {
+    let path = jni_call!(BlazeBlockObject(block).getFilePath() -> JObject)?;
     let path = jni_get_string!(path.as_obj().into())?;
-    let offset = jni_call!(SparkFileSegment(file_segment).offset() -> i64)?;
-    let length = jni_call!(SparkFileSegment(file_segment).length() -> i64)?;
-
+    let offset = jni_call!(BlazeBlockObject(block).getFileOffset() -> i64)?;
+    let length = jni_call!(BlazeBlockObject(block).getFileLength() -> i64)?;
     let mut file = File::open(path)?;
     file.seek(SeekFrom::Start(offset as u64))?;
-    Ok(IpcCompressionReader::new(
-        Box::new(BufReader::with_capacity(65536, file.take(length as u64))),
-        schema,
-    ))
+    Ok(IpcCompressionReader::new(Box::new(
+        BufReader::with_capacity(65536, file.take(length as u64)),
+    )))
+}
+
+fn get_byte_buffer_reader(block: JObject) -> Result<IpcCompressionReader<Box<dyn Read + Send>>> {
+    let byte_buffer = jni_call!(BlazeBlockObject(block).getByteBuffer() -> JObject)?;
+    if jni_call!(JavaBuffer(byte_buffer.as_obj()).isDirect() -> bool)? {
+        let reader = DirectByteBufferReader::try_new(block, byte_buffer.as_obj())?;
+        return Ok(IpcCompressionReader::new(Box::new(reader)));
+    }
+    if jni_call!(JavaBuffer(byte_buffer.as_obj()).hasArray() -> bool)? {
+        let reader = HeapByteBufferReader::try_new(block, byte_buffer.as_obj())?;
+        return Ok(IpcCompressionReader::new(Box::new(reader)));
+    }
+    df_execution_err!("ByteBuffer is not direct and do not have array")
 }
 
 struct ReadableByteChannelReader {
@@ -242,11 +303,13 @@
     closed: bool,
 }
 impl ReadableByteChannelReader {
-    pub fn new(channel: GlobalRef) -> Self {
-        Self {
-            channel,
+    pub fn try_new(block: JObject) -> Result<Self> {
+        let channel = jni_call!(BlazeBlockObject(block).getChannel() -> JObject)?;
+        let global_ref = jni_new_global_ref!(channel.as_obj())?;
+        Ok(Self {
+            channel: global_ref,
             closed: false,
-        }
+        })
     }
 
     pub fn close(&mut self) -> Result<()> {
@@ -291,3 +354,110 @@
         let _ = self.close();
     }
 }
+
+struct DirectByteBufferReader {
+    block: GlobalRef,
+    byte_buffer: GlobalRef,
+    data: &'static [u8],
+    pos: usize,
+    remaining: usize,
+}
+
+impl DirectByteBufferReader {
+    pub fn try_new(block: JObject, byte_buffer: JObject) -> Result<Self> {
+        let block_global_ref = jni_new_global_ref!(block)?;
+        let byte_buffer_global_ref = jni_new_global_ref!(byte_buffer)?;
+        let data = jni_get_direct_buffer!(byte_buffer_global_ref.as_obj())?;
+        Ok(Self {
+            block: block_global_ref,
+            byte_buffer: byte_buffer_global_ref,
+            data,
+            pos: 0,
+            remaining: data.len(),
+        })
+    }
+
+    pub fn close(&mut self) -> Result<()> {
+        jni_call!(JavaAutoCloseable(self.block.as_obj()).close() -> ())?;
+        Ok(())
+    }
+
+    fn read_impl(&mut self, buf: &mut [u8]) -> Result<usize> {
+        let read_len = buf.len().min(self.remaining);
+        buf[..read_len].copy_from_slice(&self.data[..read_len]);
+        self.pos += read_len;
+        self.remaining -= read_len;
+        Ok(read_len)
+    }
+}
+
+impl Read for DirectByteBufferReader {
+    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+        self.read_impl(buf).map_err(std::io::Error::other)
+    }
+}
+
+impl Drop for DirectByteBufferReader {
+    fn drop(&mut self) {
+        // ensure the block is closed
+        let _ = self.close();
+        let _ = self.byte_buffer;
+    }
+}
+
+struct HeapByteBufferReader {
+    block: GlobalRef,
+    byte_array: GlobalRef,
+    pos: usize,
+    remaining: usize,
+}
+unsafe impl Send for HeapByteBufferReader {} // jarray is safe to send
+
+impl HeapByteBufferReader {
+    pub fn try_new(block: JObject, byte_buffer: JObject) -> Result<Self> {
+        let block_global_ref = jni_new_global_ref!(block)?;
+        let byte_array = jni_call!(JavaBuffer(byte_buffer).array() -> JObject)?;
+        let pos = jni_call!(JavaBuffer(byte_buffer).position() -> i32)? as usize;
+        let remaining = jni_call!(JavaBuffer(byte_buffer).remaining() -> i32)? as usize;
+        let byet_array_global_ref = jni_new_global_ref!(byte_array.as_obj())?;
+        Ok(Self {
+            block: block_global_ref,
+            byte_array: byet_array_global_ref,
+            pos,
+            remaining,
+        })
+    }
+
+    pub fn close(&mut self) -> Result<()> {
+        jni_call!(JavaAutoCloseable(self.block.as_obj()).close() -> ())?;
+        Ok(())
+    }
+
+    fn read_impl(&mut self, buf: &mut [u8]) -> Result<usize> {
+        let read_len = buf.len().min(self.remaining);
+
+        jni_get_byte_array_region!(
+            self.byte_array.as_obj().cast(),
+            self.pos,
+            &mut buf[..read_len]
+        )?;
+        self.pos += read_len;
+        self.remaining -= read_len;
+        Ok(read_len)
+    }
+}
+
+impl Read for HeapByteBufferReader {
+    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+        self.read_impl(buf).map_err(std::io::Error::other)
+    }
+}
+
+impl Drop for HeapByteBufferReader {
+    fn drop(&mut self) {
+        // ensure the block is closed
+        let _ = self.close();
+        let _ = self.byte_array;
+        let _ = self.block;
+    }
+}
diff --git a/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs b/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs
index b5a58cf..20fe5a2 100644
--- a/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/ipc_writer_exec.rs
@@ -152,9 +152,8 @@
         let mut writer = IpcCompressionWriter::new(IpcConsumerWrite(ipc_consumer), true);
         while let Some(batch) = input.next().await.transpose()? {
             let _timer = metrics.elapsed_compute().timer();
-            let num_rows = batch.num_rows();
-            writer.write_batch(batch)?;
-            metrics.record_output(num_rows);
+            writer.write_batch(batch.num_rows(), batch.columns())?;
+            metrics.record_output(batch.num_rows());
         }
 
         let _timer = metrics.elapsed_compute().timer();
diff --git a/native-engine/datafusion-ext-plans/src/joins/bhj/full_join.rs b/native-engine/datafusion-ext-plans/src/joins/bhj/full_join.rs
index ca51b56..1e7b6f3 100644
--- a/native-engine/datafusion-ext-plans/src/joins/bhj/full_join.rs
+++ b/native-engine/datafusion-ext-plans/src/joins/bhj/full_join.rs
@@ -20,7 +20,10 @@
     },
 };
 
-use arrow::array::{new_null_array, ArrayRef, RecordBatch};
+use arrow::{
+    array::{new_null_array, Array, ArrayRef, RecordBatch},
+    buffer::NullBuffer,
+};
 use async_trait::async_trait;
 use bitvec::{bitvec, prelude::BitVec};
 use datafusion::{common::Result, physical_plan::metrics::Time};
@@ -30,9 +33,8 @@
     common::{batch_selection::take_cols, output::WrappedRecordBatchSender},
     joins::{
         bhj::{
-            filter_joined_indices,
             full_join::ProbeSide::{L, R},
-            ProbeSide,
+            make_eq_comparator_multiple_arrays, ProbeSide,
         },
         join_hash_map::{join_create_hashes, JoinHashMap},
         JoinParams,
@@ -136,20 +138,12 @@
     async fn flush_hash_joined(
         mut self: Pin<&mut Self>,
         probed_batch: &RecordBatch,
-        probed_key_columns: &[ArrayRef],
         probed_joined: &mut BitVec,
-        mut hash_joined_probe_indices: Vec<u32>,
-        mut hash_joined_build_indices: Vec<u32>,
+        hash_joined_probe_indices: Vec<u32>,
+        hash_joined_build_indices: Vec<u32>,
     ) -> Result<()> {
-        filter_joined_indices(
-            probed_key_columns,
-            self.map.key_columns(),
-            &mut hash_joined_probe_indices,
-            &mut hash_joined_build_indices,
-        )?;
         let probe_indices = hash_joined_probe_indices;
         let build_indices = hash_joined_build_indices;
-
         let pprojected = match P.probe_side {
             L => self
                 .join_params
@@ -197,26 +191,41 @@
         let mut hash_joined_build_indices: Vec<u32> = vec![];
         let mut probed_joined = bitvec![0; probed_batch.num_rows()];
         let batch_size = self.join_params.batch_size.max(probed_batch.num_rows());
-
         let probed_key_columns = self.create_probed_key_columns(&probed_batch)?;
         let probed_hashes = join_create_hashes(probed_batch.num_rows(), &probed_key_columns)?;
+        let probed_valids = probed_key_columns
+            .iter()
+            .map(|col| col.nulls().cloned())
+            .reduce(|nb1, nb2| NullBuffer::union(nb1.as_ref(), nb2.as_ref()))
+            .flatten();
+
+        let eq = make_eq_comparator_multiple_arrays(&probed_key_columns, self.map.key_columns())?;
 
         // join by hash code
         for (row_idx, &hash) in probed_hashes.iter().enumerate() {
+            // nulls may not be joined
+            if probed_valids
+                .as_ref()
+                .map(|nb| nb.is_null(row_idx))
+                .unwrap_or(false)
+            {
+                continue;
+            }
+
             let mut maybe_joined = false;
             if let Some(entries) = self.map.entry_indices(hash) {
                 for map_idx in entries {
-                    hash_joined_probe_indices.push(row_idx as u32);
-                    hash_joined_build_indices.push(map_idx);
+                    if eq(row_idx, map_idx as usize) {
+                        hash_joined_probe_indices.push(row_idx as u32);
+                        hash_joined_build_indices.push(map_idx);
+                    }
                 }
                 maybe_joined = true;
             }
-
             if maybe_joined && hash_joined_probe_indices.len() > batch_size {
                 self.as_mut()
                     .flush_hash_joined(
                         &probed_batch,
-                        &probed_key_columns,
                         &mut probed_joined,
                         std::mem::take(&mut hash_joined_probe_indices),
                         std::mem::take(&mut hash_joined_build_indices),
@@ -224,11 +233,11 @@
                     .await?;
             }
         }
+
         if !hash_joined_probe_indices.is_empty() {
             self.as_mut()
                 .flush_hash_joined(
                     &probed_batch,
-                    &probed_key_columns,
                     &mut probed_joined,
                     hash_joined_probe_indices,
                     hash_joined_build_indices,
diff --git a/native-engine/datafusion-ext-plans/src/joins/bhj/mod.rs b/native-engine/datafusion-ext-plans/src/joins/bhj/mod.rs
index 57d934c..9f65702 100644
--- a/native-engine/datafusion-ext-plans/src/joins/bhj/mod.rs
+++ b/native-engine/datafusion-ext-plans/src/joins/bhj/mod.rs
@@ -12,12 +12,11 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use arrow::{
-    array::*,
-    datatypes::{DataType, IntervalUnit, TimeUnit},
-};
+use arrow::array::ArrayRef;
 use datafusion::common::Result;
-use datafusion_ext_commons::{df_execution_err, downcast_any};
+use datafusion_ext_commons::df_execution_err;
+
+use crate::common::make_eq_comparator::{make_eq_comparator, DynEqComparator};
 
 pub mod full_join;
 pub mod semi_join;
@@ -28,119 +27,22 @@
     R,
 }
 
-fn filter_joined_indices(
-    key_columns1: &[ArrayRef],
-    key_columns2: &[ArrayRef],
-    indices1: &mut Vec<u32>,
-    indices2: &mut Vec<u32>,
-) -> Result<()> {
-    fn filter_one(
-        key_column1: &ArrayRef,
-        key_column2: &ArrayRef,
-        indices1: &mut Vec<u32>,
-        indices2: &mut Vec<u32>,
-    ) -> Result<()> {
-        macro_rules! filter_atomic {
-            ($cast_type:ty) => {{
-                let col1 = downcast_any!(key_column1, $cast_type)?;
-                let col2 = downcast_any!(key_column2, $cast_type)?;
-                let mut valid_count = 0;
-                for i in 0..indices1.len() {
-                    let idx1 = indices1[i] as usize;
-                    let idx2 = indices2[i] as usize;
-                    if col1.is_valid(idx1) && col2.is_valid(idx2) && {
-                        let v1 = col1.value(idx1);
-                        let v2 = col2.value(idx2);
-                        v1 == v2
-                    } {
-                        indices1[valid_count] = indices1[i];
-                        indices2[valid_count] = indices2[i];
-                        valid_count += 1;
-                    }
-                }
-                indices1.truncate(valid_count);
-                indices2.truncate(valid_count);
-            }};
-        }
-
-        let dt1 = key_column1.data_type();
-        let dt2 = key_column2.data_type();
-        if dt1 != dt2 {
-            return df_execution_err!("join key data type not matched: {dt1:?} <-> {dt2:?}");
-        }
-        match dt1 {
-            DataType::Null => {
-                indices1.clear();
-                indices2.clear();
-            }
-            DataType::Boolean => filter_atomic!(BooleanArray),
-            DataType::Int8 => filter_atomic!(Int8Array),
-            DataType::Int16 => filter_atomic!(Int16Array),
-            DataType::Int32 => filter_atomic!(Int32Array),
-            DataType::Int64 => filter_atomic!(Int64Array),
-            DataType::UInt8 => filter_atomic!(UInt8Array),
-            DataType::UInt16 => filter_atomic!(UInt16Array),
-            DataType::UInt32 => filter_atomic!(UInt32Array),
-            DataType::UInt64 => filter_atomic!(UInt64Array),
-            DataType::Float16 => filter_atomic!(Float16Array),
-            DataType::Float32 => filter_atomic!(Float32Array),
-            DataType::Float64 => filter_atomic!(Float64Array),
-            DataType::Timestamp(unit, _) => match unit {
-                TimeUnit::Second => filter_atomic!(TimestampSecondArray),
-                TimeUnit::Millisecond => filter_atomic!(TimestampMillisecondArray),
-                TimeUnit::Microsecond => filter_atomic!(TimestampMicrosecondArray),
-                TimeUnit::Nanosecond => filter_atomic!(TimestampNanosecondArray),
-            },
-            DataType::Date32 => filter_atomic!(Date32Array),
-            DataType::Date64 => filter_atomic!(Date64Array),
-            DataType::Time32(unit) => match unit {
-                TimeUnit::Second => filter_atomic!(Time32SecondArray),
-                TimeUnit::Millisecond => filter_atomic!(Time32MillisecondArray),
-                TimeUnit::Microsecond => filter_atomic!(Time32MillisecondArray),
-                TimeUnit::Nanosecond => filter_atomic!(Time32MillisecondArray),
-            },
-            DataType::Time64(unit) => match unit {
-                TimeUnit::Microsecond => filter_atomic!(Time64MicrosecondArray),
-                TimeUnit::Nanosecond => filter_atomic!(Time64NanosecondArray),
-                _ => return df_execution_err!("unsupported time64 unit: {unit:?}"),
-            },
-            DataType::Duration(unit) => match unit {
-                TimeUnit::Second => filter_atomic!(DurationSecondArray),
-                TimeUnit::Millisecond => filter_atomic!(DurationMillisecondArray),
-                TimeUnit::Microsecond => filter_atomic!(DurationMicrosecondArray),
-                TimeUnit::Nanosecond => filter_atomic!(DurationNanosecondArray),
-            },
-            DataType::Interval(unit) => match unit {
-                IntervalUnit::YearMonth => filter_atomic!(IntervalYearMonthArray),
-                IntervalUnit::DayTime => filter_atomic!(IntervalDayTimeArray),
-                IntervalUnit::MonthDayNano => filter_atomic!(IntervalMonthDayNanoArray),
-            },
-            DataType::Binary => filter_atomic!(BinaryArray),
-            DataType::FixedSizeBinary(_) => filter_atomic!(FixedSizeBinaryArray),
-            DataType::LargeBinary => filter_atomic!(LargeBinaryArray),
-            DataType::Utf8 => filter_atomic!(StringArray),
-            DataType::LargeUtf8 => filter_atomic!(LargeStringArray),
-            DataType::List(_) => filter_atomic!(ListArray),
-            DataType::FixedSizeList(..) => filter_atomic!(FixedSizeListArray),
-            DataType::LargeList(_) => filter_atomic!(LargeListArray),
-            DataType::Struct(_) => filter_joined_indices(
-                key_column1.as_struct().columns(),
-                key_column2.as_struct().columns(),
-                indices1,
-                indices2,
-            )?,
-            DataType::Decimal128(..) => filter_atomic!(Decimal128Array),
-            DataType::Decimal256(..) => filter_atomic!(Decimal256Array),
-            DataType::Map(..) => filter_atomic!(MapArray),
-            dt => {
-                return df_execution_err!("unsupported data type: {dt:?}");
-            }
-        }
-        Ok(())
+pub fn make_eq_comparator_multiple_arrays(
+    cols1: &[ArrayRef],
+    cols2: &[ArrayRef],
+) -> Result<DynEqComparator> {
+    if cols1.len() != cols2.len() {
+        return df_execution_err!(
+            "make_eq_comparator_multiple_arrays: cols1.len ({}) != cols2.len ({})",
+            cols1.len(),
+            cols2.len(),
+        );
     }
 
-    for (key_column1, key_column2) in key_columns1.iter().zip(key_columns2) {
-        filter_one(key_column1, key_column2, indices1, indices2)?;
-    }
-    Ok(())
+    let eqs = cols1
+        .iter()
+        .zip(cols2)
+        .map(|(col1, col2)| Ok(make_eq_comparator(col1, col2)?))
+        .collect::<Result<Vec<_>>>()?;
+    Ok(Box::new(move |i, j| eqs.iter().all(|eq| eq(i, j))))
 }
diff --git a/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs b/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs
index 8c168f0..2a131de 100644
--- a/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs
+++ b/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs
@@ -20,17 +20,21 @@
     },
 };
 
-use arrow::array::{ArrayRef, BooleanArray, RecordBatch};
+use arrow::{
+    array::{ArrayRef, BooleanArray, RecordBatch},
+    buffer::NullBuffer,
+};
 use async_trait::async_trait;
 use bitvec::{bitvec, prelude::BitVec};
 use datafusion::{common::Result, physical_plan::metrics::Time};
+use hashbrown::HashSet;
 
 use crate::{
     broadcast_join_exec::Joiner,
     common::{batch_selection::take_cols, output::WrappedRecordBatchSender},
     joins::{
         bhj::{
-            filter_joined_indices,
+            make_eq_comparator_multiple_arrays,
             semi_join::{
                 ProbeSide::{L, R},
                 SemiMode::{Anti, Existence, Semi},
@@ -92,6 +96,7 @@
     join_params: JoinParams,
     output_sender: Arc<WrappedRecordBatchSender>,
     map_joined: BitVec,
+    hash_skippable: HashSet<i32>,
     map: Arc<JoinHashMap>,
     send_output_time: Time,
     output_rows: AtomicUsize,
@@ -109,6 +114,7 @@
             output_sender,
             map,
             map_joined,
+            hash_skippable: HashSet::new(),
             send_output_time: Time::new(),
             output_rows: AtomicUsize::new(0),
         }
@@ -139,31 +145,6 @@
         drop(timer);
         Ok(())
     }
-
-    fn flush_hash_joined(
-        mut self: Pin<&mut Self>,
-        probed_key_columns: &[ArrayRef],
-        probed_joined: &mut BitVec,
-        mut hash_joined_probe_indices: Vec<u32>,
-        mut hash_joined_build_indices: Vec<u32>,
-    ) -> Result<()> {
-        filter_joined_indices(
-            probed_key_columns,
-            self.map.key_columns(),
-            &mut hash_joined_probe_indices,
-            &mut hash_joined_build_indices,
-        )?;
-        let probe_indices = hash_joined_probe_indices;
-        let build_indices = hash_joined_build_indices;
-
-        for &idx in &probe_indices {
-            probed_joined.set(idx as usize, true);
-        }
-        for &idx in &build_indices {
-            self.map_joined.set(idx as usize, true);
-        }
-        Ok(())
-    }
 }
 
 #[async_trait]
@@ -172,38 +153,62 @@
         let mut hash_joined_probe_indices: Vec<u32> = vec![];
         let mut hash_joined_build_indices: Vec<u32> = vec![];
         let mut probed_joined = bitvec![0; probed_batch.num_rows()];
+        let map_joined = unsafe {
+            // safety: ignore r/w conflicts with self.map
+            std::mem::transmute::<_, &mut BitVec>(&mut self.map_joined)
+        };
 
         let probed_key_columns = self.create_probed_key_columns(&probed_batch)?;
         let probed_hashes = join_create_hashes(probed_batch.num_rows(), &probed_key_columns)?;
+        let probed_valids = probed_key_columns
+            .iter()
+            .map(|col| col.nulls().cloned())
+            .reduce(|nb1, nb2| NullBuffer::union(nb1.as_ref(), nb2.as_ref()))
+            .flatten();
+
+        let eq = make_eq_comparator_multiple_arrays(&probed_key_columns, self.map.key_columns())?;
 
         // join by hash code
         for (row_idx, &hash) in probed_hashes.iter().enumerate() {
+            // nulls may not be joined
+            if probed_valids
+                .as_ref()
+                .map(|nb| nb.is_null(row_idx))
+                .unwrap_or(false)
+            {
+                continue;
+            }
+
+            // map is join side -- skip hashes those may not be joined
+            if !P.probe_is_join_side && self.hash_skippable.contains(&hash) {
+                continue;
+            }
+
             let mut maybe_joined = false;
             if let Some(entries) = self.map.entry_indices(hash) {
                 for map_idx in entries {
-                    hash_joined_probe_indices.push(row_idx as u32);
-                    hash_joined_build_indices.push(map_idx);
+                    // join only once if map side is the join side
+                    if !P.probe_is_join_side && map_joined[map_idx as usize] {
+                        continue;
+                    }
+                    if eq(row_idx, map_idx as usize) {
+                        hash_joined_probe_indices.push(row_idx as u32);
+                        hash_joined_build_indices.push(map_idx);
+                        if P.probe_is_join_side {
+                            probed_joined.set(row_idx, true);
+                        } else {
+                            map_joined.set(map_idx as usize, true);
+                        }
+                    }
+                    maybe_joined = true;
                 }
-                maybe_joined = true;
             }
 
-            if maybe_joined && hash_joined_probe_indices.len() >= self.join_params.batch_size {
-                self.as_mut().flush_hash_joined(
-                    &probed_key_columns,
-                    &mut probed_joined,
-                    std::mem::take(&mut hash_joined_probe_indices),
-                    std::mem::take(&mut hash_joined_build_indices),
-                )?;
+            // map is join side -- mark the hash as skippable
+            if !P.probe_is_join_side && !maybe_joined {
+                self.hash_skippable.insert(hash);
             }
         }
-        if !hash_joined_probe_indices.is_empty() {
-            self.as_mut().flush_hash_joined(
-                &probed_key_columns,
-                &mut probed_joined,
-                hash_joined_probe_indices,
-                hash_joined_build_indices,
-            )?;
-        }
 
         if P.probe_is_join_side {
             let pprojected = match P.probe_side {
@@ -273,6 +278,14 @@
         Ok(())
     }
 
+    fn can_early_stop(&self) -> bool {
+        if !P.probe_is_join_side && self.map_joined.all() {
+            // semi join: map is join side and all items are joined
+            return true;
+        }
+        false
+    }
+
     fn total_send_output_time(&self) -> usize {
         self.send_output_time.value()
     }
diff --git a/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs b/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs
index 91b2ffd..089bd67 100644
--- a/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs
+++ b/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs
@@ -13,6 +13,7 @@
 // limitations under the License.
 
 use std::{
+    fmt::{Debug, Formatter},
     io::{Cursor, Read, Write},
     slice::{from_raw_parts, from_raw_parts_mut},
     sync::Arc,
@@ -20,32 +21,41 @@
 
 use arrow::{
     array::{Array, ArrayRef, AsArray, BinaryBuilder, RecordBatch},
+    buffer::NullBuffer,
     datatypes::{DataType, Field, FieldRef, Schema, SchemaRef},
 };
 use byteorder::{NativeEndian, ReadBytesExt, WriteBytesExt};
 use datafusion::{common::Result, physical_expr::PhysicalExprRef};
-use datafusion_ext_commons::spark_hash::create_murmur3_hashes;
-use hashbrown::HashMap;
-use itertools::Itertools;
+use datafusion_ext_commons::spark_hash::create_hashes;
 use once_cell::sync::OnceCell;
 
-use crate::common::batch_selection::take_batch;
+use crate::joins::bhj::make_eq_comparator_multiple_arrays;
 
 pub struct Table {
-    entry_offsets: Vec<u32>,
-    entry_lens: Vec<u32>,
-    item_indices: Vec<u32>,
-    item_hashes: Vec<i32>,
+    entries: Vec<Entry>,
+    items: Vec<Item>,
+}
+
+#[derive(Clone, Copy, Default, Debug)]
+#[repr(C)]
+struct Entry {
+    offset: u32,
+    len: u32,
+}
+
+#[derive(Clone, Copy, Default, Debug)]
+#[repr(C)]
+struct Item {
+    idx: u32,
+    hash: i32,
 }
 
 impl Table {
     pub fn new_empty() -> Self {
         let num_entries = Self::num_entries_of_rows(0);
         Self {
-            entry_offsets: vec![0; num_entries],
-            entry_lens: vec![0; num_entries],
-            item_indices: vec![],
-            item_hashes: vec![],
+            entries: vec![Entry::default(); num_entries],
+            items: vec![],
         }
     }
 
@@ -54,70 +64,58 @@
         data_batch: RecordBatch,
         key_columns: &[ArrayRef],
     ) -> Result<(Self, RecordBatch)> {
-        // returns the new data batch sorted by hashes
-
         assert!(
             num_rows < 1073741824,
             "join hash table: number of rows exceeded 2^30: {num_rows}"
         );
 
-        let num_entries = Self::num_entries_of_rows(num_rows) as u32;
+        let num_entries = Self::num_entries_of_rows(num_rows);
         let item_hashes = join_create_hashes(num_rows, &key_columns)?;
-        let item_valids = (0..num_rows)
-            .map(|row_idx| key_columns.iter().all(|key| key.is_valid(row_idx)))
+        let item_valids = key_columns
+            .iter()
+            .map(|col| col.nulls().cloned())
+            .reduce(|nb1, nb2| NullBuffer::union(nb1.as_ref(), nb2.as_ref()))
+            .flatten();
+        let item_is_valid = |row_idx| match &item_valids {
+            Some(nb) => nb.is_valid(row_idx),
+            None => true,
+        };
+
+        let entries_to_row_indices = (0..num_rows)
+            .into_iter()
+            .filter(|&row_idx| item_is_valid(row_idx)) // exclude null keys
+            .map(|row_idx| (row_idx as u32, (item_hashes[row_idx] as usize % num_entries) as u32))
             .collect::<Vec<_>>();
 
-        // sort record batch by hashes for better compression and data locality
-        // null values are placed in the front of each entry
-        let (indices, item_hashes, item_valids): (Vec<usize>, Vec<i32>, Vec<bool>) = item_hashes
-            .into_iter()
-            .zip(item_valids)
-            .enumerate()
-            .map(|(row_idx, (hash, valid))| (row_idx, hash, valid))
-            .sorted_unstable_by_key(|&(_row_idx, hash, valid)| (hash, valid))
-            .multiunzip();
-        let data_batch = take_batch(data_batch, indices)?;
-
-        let mut entries_to_row_indices: HashMap<u32, Vec<u32>> = HashMap::new();
-        for (row_idx, hash) in item_hashes.iter().enumerate() {
-            let entry = *hash as u32 % num_entries;
-            entries_to_row_indices
-                .entry(entry)
-                .or_default()
-                .push(row_idx as u32);
+        // init entry offsets
+        let mut entry_counts = vec![0; num_entries];
+        for &(_, entry_idx) in &entries_to_row_indices {
+            entry_counts[entry_idx as usize] += 1;
+        }
+        let mut entries = vec![Entry::default(); num_entries];
+        let mut offset = 0;
+        for (entry_idx, entry_count) in entry_counts.into_iter().enumerate() {
+            entries[entry_idx].offset = offset;
+            offset += entry_count;
         }
 
-        let mut entry_offsets = Vec::with_capacity(num_entries as usize);
-        let mut entry_lens = Vec::with_capacity(num_entries as usize);
-        let mut item_indices = Vec::with_capacity(num_rows);
-        for entry in 0..num_entries {
-            match entries_to_row_indices.get(&entry) {
-                Some(row_indices) => {
-                    entry_offsets.push(item_indices.len() as u32);
-                    entry_lens.push(row_indices.len() as u32);
-                    item_indices.extend_from_slice(row_indices);
-                }
-                None => {
-                    entry_offsets.push(item_indices.len() as u32);
-                    entry_lens.push(0);
-                }
+        let mut items = vec![Item::default(); num_rows];
+        for (row_idx, entry_idx) in entries_to_row_indices {
+            if !item_is_valid(row_idx as usize) {
+                continue;
             }
+            let entry = &mut entries[entry_idx as usize];
+            entry.len += 1;
 
-            // exclude null values from entry
-            let cur_entry_idx = entry_offsets.len() - 1;
-            while entry_lens[cur_entry_idx] > 0
-                && !item_valids[item_indices[entry_offsets[cur_entry_idx] as usize] as usize]
-            {
-                entry_offsets[cur_entry_idx] += 1;
-                entry_lens[cur_entry_idx] -= 1;
-            }
+            let item_idx = entry.offset + entry.len - 1;
+            items[item_idx as usize] = Item {
+                idx: row_idx,
+                hash: item_hashes[row_idx as usize],
+            };
         }
-        let new = Self {
-            entry_offsets,
-            entry_lens,
-            item_indices,
-            item_hashes,
-        };
+        items.resize(num_rows, Item::default());
+
+        let new = Self { entries, items };
         Ok((new, data_batch))
     }
 
@@ -127,72 +125,52 @@
         let num_entries = Self::num_entries_of_rows(num_rows);
 
         let mut new = Self {
-            entry_offsets: vec![0; num_entries],
-            entry_lens: vec![0; num_entries],
-            item_indices: vec![0; num_rows],
-            item_hashes: vec![0; num_rows],
+            entries: vec![Entry::default(); num_entries],
+            items: vec![Item::default(); num_rows],
         };
 
         unsafe {
             // safety: read integer arrays as raw bytes
             cursor.read_exact(from_raw_parts_mut(
-                new.entry_offsets.as_mut_ptr() as *mut u8,
-                num_entries * 4,
+                new.entries.as_mut_ptr() as *mut u8,
+                num_entries * 8,
             ))?;
             cursor.read_exact(from_raw_parts_mut(
-                new.entry_lens.as_mut_ptr() as *mut u8,
-                num_entries * 4,
-            ))?;
-            cursor.read_exact(from_raw_parts_mut(
-                new.item_indices.as_mut_ptr() as *mut u8,
-                num_rows * 4,
-            ))?;
-            cursor.read_exact(from_raw_parts_mut(
-                new.item_hashes.as_mut_ptr() as *mut u8,
-                num_rows * 4,
+                new.items.as_mut_ptr() as *mut u8,
+                num_rows * 8,
             ))?;
         }
         Ok(new)
     }
 
     pub fn try_into_raw_bytes(self) -> Result<Vec<u8>> {
-        let num_entries = self.entry_offsets.len();
-        let num_rows = self.item_indices.len();
-        let mut raw_bytes = Vec::with_capacity(num_entries * 8 + num_rows * 4 + 4);
+        let num_entries = self.entries.len();
+        let num_rows = self.items.len();
+        let mut raw_bytes = Vec::with_capacity(num_entries * 8 + num_rows * 8 + 4);
 
         raw_bytes.write_u32::<NativeEndian>(num_rows as u32)?;
         unsafe {
             // safety: write integer arrays as raw bytes
             raw_bytes.write_all(from_raw_parts(
-                self.entry_offsets.as_ptr() as *const u8,
-                num_entries * 4,
+                self.entries.as_ptr() as *const u8,
+                num_entries * 8,
             ))?;
             raw_bytes.write_all(from_raw_parts(
-                self.entry_lens.as_ptr() as *const u8,
-                num_entries * 4,
-            ))?;
-            raw_bytes.write_all(from_raw_parts(
-                self.item_indices.as_ptr() as *const u8,
-                num_rows * 4,
-            ))?;
-            raw_bytes.write_all(from_raw_parts(
-                self.item_hashes.as_ptr() as *const u8,
-                num_rows * 4,
+                self.items.as_ptr() as *const u8,
+                num_rows * 8,
             ))?;
         }
         Ok(raw_bytes)
     }
 
     pub fn entry<'a>(&'a self, hash: i32) -> Option<impl Iterator<Item = u32> + 'a> {
-        let entry = (hash as u32) % (self.entry_offsets.len() as u32);
-        let len = self.entry_lens[entry as usize] as usize;
-        if len > 0 {
-            let offset = self.entry_offsets[entry as usize] as usize;
+        let entry = self.entries[(hash as usize) % self.entries.len()];
+        if entry.len > 0 {
             Some(
-                self.item_indices[offset..][..len]
+                self.items[entry.offset as usize..][..entry.len as usize]
                     .iter()
-                    .cloned()
-                    .filter(move |&idx| self.item_hashes[idx as usize] == hash),
+                    .filter(move |item| item.hash == hash)
+                    .map(move |item| item.idx),
             )
         } else {
             None
@@ -200,7 +178,7 @@
     }
 
     fn num_entries_of_rows(num_rows: usize) -> usize {
-        num_rows * 3 + 1
+        num_rows * 5 + 1
     }
 }
 
@@ -210,6 +188,12 @@
     table: Table,
 }
 
+impl Debug for JoinHashMap {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "JoinHashMap(..)")
+    }
+}
+
 impl JoinHashMap {
     pub fn try_from_data_batch(
         data_batch: RecordBatch,
@@ -281,36 +265,22 @@
     }
 
     pub fn distinct(&mut self) -> Result<()> {
-        let comparators = self
-            .key_columns
-            .iter()
-            .map(|array| Ok(arrow::array::build_compare(&array, &array)?))
-            .collect::<Result<Vec<_>>>()?;
-        let total_eq = |i, j| {
-            comparators
-                .iter()
-                .all(|comparator| comparator(i, j).is_eq())
-        };
+        let eq = make_eq_comparator_multiple_arrays(&self.key_columns, &self.key_columns)?;
 
-        for entry in 0..self.table.entry_offsets.len() {
-            if self.table.entry_lens[entry] <= 1 {
-                continue;
-            }
-            let entry_offset = self.table.entry_offsets[entry] as usize;
-            let mut entry_end = entry_offset + self.table.entry_lens[entry] as usize;
+        for entry in self.table.entries.iter_mut().filter(|entry| entry.len > 1) {
+            let entry_offset = entry.offset as usize;
+            let mut entry_end = entry_offset + entry.len as usize;
             let mut i = entry_offset + 1;
 
             while i < entry_end {
-                let item_i = self.table.item_indices[i] as usize;
-                let hash_i = self.table.item_hashes[item_i];
+                let item_i = self.table.items[i];
                 let mut removed = false;
 
                 for j in entry_offset..i {
-                    let item_j = self.table.item_indices[j] as usize;
-                    let hash_j = self.table.item_hashes[item_j];
-                    if hash_j == hash_i && total_eq(item_j, item_i) {
+                    let item_j = self.table.items[j];
+                    if item_j.hash == item_i.hash && eq(item_j.idx as usize, item_i.idx as usize) {
                         // remove an duplicated key, remove it from entry
-                        self.table.item_indices.swap(i, entry_end - 1);
+                        self.table.items.swap(i, entry_end - 1);
                         entry_end -= 1;
                         removed = true;
                         break;
@@ -320,7 +290,7 @@
                     i += 1;
                 }
             }
-            self.table.entry_lens[entry] = (entry_end - entry_offset) as u32;
+            entry.len = (entry_end - entry_offset) as u32;
         }
         Ok(())
     }
@@ -337,6 +307,10 @@
         &self.key_columns
     }
 
+    pub fn num_entries(&self) -> usize {
+        self.table.entries.len()
+    }
+
     pub fn entry_indices<'a>(&'a self, hash: i32) -> Option<impl Iterator<Item = u32> + 'a> {
         self.table.entry(hash)
     }
@@ -385,9 +359,11 @@
 
 #[inline]
 pub fn join_create_hashes(num_rows: usize, key_columns: &[ArrayRef]) -> Result<Vec<i32>> {
-    const JOIN_HASH_RANDOM_SEED: i32 = 0x90ec4058u32 as i32;
+    const JOIN_HASH_RANDOM_SEED: i32 = 0x30ec4058i32;
     let mut hashes = vec![JOIN_HASH_RANDOM_SEED; num_rows];
-    create_murmur3_hashes(key_columns, &mut hashes)?;
+    create_hashes(key_columns, &mut hashes, |v, h| {
+        gxhash::gxhash32(v, h as i64) as i32
+    })?;
     Ok(hashes)
 }
 
diff --git a/native-engine/datafusion-ext-plans/src/joins/test.rs b/native-engine/datafusion-ext-plans/src/joins/test.rs
index f23c6e9..fbed84c 100644
--- a/native-engine/datafusion-ext-plans/src/joins/test.rs
+++ b/native-engine/datafusion-ext-plans/src/joins/test.rs
@@ -36,7 +36,6 @@
     use crate::{
         broadcast_join_build_hash_map_exec::BroadcastJoinBuildHashMapExec,
         broadcast_join_exec::BroadcastJoinExec,
-        hash_join_exec::HashJoinExec,
         joins::join_utils::{JoinType, JoinType::*},
         sort_merge_join_exec::SortMergeJoinExec,
     };
@@ -215,6 +214,7 @@
                     on,
                     join_type,
                     JoinSide::Right,
+                    true,
                     None,
                 )?)
             }
@@ -230,24 +230,29 @@
                     on,
                     join_type,
                     JoinSide::Left,
+                    true,
                     None,
                 )?)
             }
-            SHJLeftProbed => Arc::new(HashJoinExec::try_new(
-                schema,
-                left,
-                right,
-                on,
-                join_type,
-                JoinSide::Left,
-            )?),
-            SHJRightProbed => Arc::new(HashJoinExec::try_new(
+            SHJLeftProbed => Arc::new(BroadcastJoinExec::try_new(
                 schema,
                 left,
                 right,
                 on,
                 join_type,
                 JoinSide::Right,
+                false,
+                None,
+            )?),
+            SHJRightProbed => Arc::new(BroadcastJoinExec::try_new(
+                schema,
+                left,
+                right,
+                on,
+                join_type,
+                JoinSide::Left,
+                false,
+                None,
             )?),
         };
         let columns = columns(&join.schema());
diff --git a/native-engine/datafusion-ext-plans/src/lib.rs b/native-engine/datafusion-ext-plans/src/lib.rs
index bf2ade5..42d05cf 100644
--- a/native-engine/datafusion-ext-plans/src/lib.rs
+++ b/native-engine/datafusion-ext-plans/src/lib.rs
@@ -25,7 +25,6 @@
 pub mod ffi_reader_exec;
 pub mod filter_exec;
 pub mod generate_exec;
-pub mod hash_join_exec;
 pub mod ipc_reader_exec;
 pub mod ipc_writer_exec;
 pub mod limit_exec;
@@ -49,3 +48,10 @@
 pub mod joins;
 mod shuffle;
 pub mod window;
+
+#[macro_export]
+macro_rules! unchecked {
+    ($expr:expr) => {
+        unsafe { unchecked_index::unchecked_index($expr) }
+    };
+}
diff --git a/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs b/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs
index ee84f74..28ba870 100644
--- a/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs
+++ b/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs
@@ -23,7 +23,7 @@
     array_size::ArraySize,
     compute_suggested_batch_size_for_output,
     ds::rdx_tournament_tree::{KeyForRadixTournamentTree, RadixTournamentTree},
-    rdxsort::radix_sort_u16_ranged_by,
+    rdxsort::radix_sort_unstable_by_key,
     staging_mem_size_for_partial_sort,
 };
 use jni::objects::GlobalRef;
@@ -116,7 +116,8 @@
             // write all batches with this part id
             let mut writer = IpcCompressionWriter::new(CountWrite::from(&mut w), true);
             while iter.cur_part_id() == cur_part_id {
-                writer.write_batch(iter.next_batch())?;
+                let batch = iter.next_batch();
+                writer.write_batch(batch.num_rows(), batch.columns())?;
             }
             offset += writer.finish_into_inner()?.count();
             offsets.push(offset);
@@ -156,7 +157,8 @@
 
             // write all batches with this part id
             while iter.cur_part_id() == cur_part_id {
-                writer.write_batch(iter.next_batch())?;
+                let batch = iter.next_batch();
+                writer.write_batch(batch.num_rows(), batch.columns())?;
             }
             writer.finish_into_inner()?;
         }
@@ -260,7 +262,6 @@
     batches: Vec<RecordBatch>,
     partitioning: &Partitioning,
 ) -> Result<(Vec<u32>, RecordBatch)> {
-    let num_rows = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
     let num_partitions = partitioning.partition_count();
     let schema = batches[0].schema();
 
@@ -277,12 +278,11 @@
         })
         .collect::<Vec<_>>();
 
-    // use quick sort if there are too many partitions or too few rows, otherwise
-    // use radix sort
-    if num_partitions < 65536 && num_rows >= num_partitions {
-        radix_sort_u16_ranged_by(&mut indices, num_partitions, |v| v.0 as u16);
+    // sort indices by radix sort
+    if num_partitions < 65536 {
+        radix_sort_unstable_by_key(&mut indices, |v| v.0 as u16);
     } else {
-        indices.sort_unstable_by_key(|v| v.0);
+        radix_sort_unstable_by_key(&mut indices, |v| v.0);
     }
 
     // get sorted batches
diff --git a/native-engine/datafusion-ext-plans/src/shuffle/rss_single_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/rss_single_repartitioner.rs
index 6dc927e..b350c26 100644
--- a/native-engine/datafusion-ext-plans/src/shuffle/rss_single_repartitioner.rs
+++ b/native-engine/datafusion-ext-plans/src/shuffle/rss_single_repartitioner.rs
@@ -44,9 +44,13 @@
 impl ShuffleRepartitioner for RssSingleShuffleRepartitioner {
     async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
         let rss_partition_writer = self.rss_partition_writer.clone();
-        tokio::task::spawn_blocking(move || rss_partition_writer.lock().write_batch(input))
-            .await
-            .or_else(|err| df_execution_err!("{err}"))??;
+        tokio::task::spawn_blocking(move || {
+            rss_partition_writer
+                .lock()
+                .write_batch(input.num_rows(), input.columns())
+        })
+        .await
+        .or_else(|err| df_execution_err!("{err}"))??;
         Ok(())
     }
 
diff --git a/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs
index 04c5b6a..810b94d 100644
--- a/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs
+++ b/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs
@@ -70,7 +70,7 @@
         let _timer = self.metrics.elapsed_compute().timer();
         let mut output_data = self.output_data.lock().await;
         let output_writer = self.get_output_writer(&mut *output_data)?;
-        output_writer.write_batch(input)?;
+        output_writer.write_batch(input.num_rows(), input.columns())?;
         Ok(())
     }
 
@@ -85,14 +85,12 @@
             let mut output_index = File::create(&self.output_index_file)?;
             output_index.write_all(&[0u8; 8])?;
             output_index.write_all(&(offset as i64).to_le_bytes()[..])?;
-            output_index.sync_data()?;
         } else {
             // write empty data file and index file
             let output_data = File::create(&self.output_data_file)?;
             output_data.set_len(0)?;
             let mut output_index = File::create(&self.output_index_file)?;
             output_index.write_all(&[0u8; 16])?;
-            output_index.sync_data()?;
         }
         Ok(())
     }
diff --git a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs
index dbc3dad..bf6b16b 100644
--- a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs
+++ b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs
@@ -20,6 +20,7 @@
 
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
+use bytesize::ByteSize;
 use datafusion::{
     common::{DataFusionError, Result},
     physical_plan::{metrics::ExecutionPlanMetricsSet, Partitioning},
@@ -127,7 +128,14 @@
 
         // we are likely to spill more frequently because the cost of spilling a shuffle
         // repartition is lower than other consumers.
-        if self.mem_used_percent() > 0.8 {
+        let mem_used_percent = self.mem_used_percent();
+        if mem_used_percent > 0.8 {
+            log::info!(
+                "{} memory usage: {}, percent: {:.3}, spilling...",
+                self.name(),
+                ByteSize(mem_used as u64),
+                mem_used_percent,
+            );
             self.spill().await?;
         }
         Ok(())
@@ -158,15 +166,11 @@
                     .open(&data_file)?;
 
                 let offsets = data.write(&mut output_data, &partitioning)?;
-                output_data.sync_data()?;
-                output_data.flush()?;
 
                 let mut output_index = File::create(&index_file)?;
                 for offset in offsets {
                     output_index.write_all(&(offset as i64).to_le_bytes()[..])?;
                 }
-                output_index.sync_data()?;
-                output_index.flush()?;
                 Ok::<(), DataFusionError>(())
             })
             .await
@@ -260,8 +264,6 @@
                     min_spill.skip_empty_partitions();
                 }
             }
-            output_data.sync_data()?;
-            output_data.flush()?;
 
             // add one extra offset at last to ease partition length computation
             offsets.resize(num_output_partitions + 1, output_data.stream_position()?);
@@ -270,8 +272,6 @@
             for offset in offsets {
                 output_index.write_all(&(offset as i64).to_le_bytes()[..])?;
             }
-            output_index.sync_data()?;
-            output_index.flush()?;
             Ok::<(), DataFusionError>(())
         })
         .await
diff --git a/native-engine/datafusion-ext-plans/src/sort_exec.rs b/native-engine/datafusion-ext-plans/src/sort_exec.rs
index 56a8ec6..ff70f7d 100644
--- a/native-engine/datafusion-ext-plans/src/sort_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/sort_exec.rs
@@ -48,7 +48,7 @@
     compute_suggested_batch_size_for_kway_merge, compute_suggested_batch_size_for_output,
     downcast_any,
     ds::loser_tree::{ComparableForLoserTree, LoserTree},
-    io::{read_len, read_one_batch, write_len, write_one_batch},
+    io::{read_len, read_one_batch, recover_named_batch, write_len, write_one_batch},
     streams::coalesce_stream::CoalesceInput,
 };
 use futures::{lock::Mutex, stream::once, StreamExt, TryStreamExt};
@@ -259,9 +259,7 @@
         for (key_collector, batch) in
             self.into_sorted_batches::<SqueezeKeyCollector>(sub_batch_size, sorter)?
         {
-            let mut buf = vec![];
-            write_one_batch(&batch, &mut Cursor::new(&mut buf))?;
-            writer.write_all(&buf)?;
+            write_one_batch(batch.num_rows(), batch.columns(), &mut writer)?;
             writer.write_all(&key_collector.store)?;
         }
         Ok(())
@@ -733,7 +731,8 @@
     }
 
     fn load_next_batch(&mut self) -> Result<bool> {
-        if let Some(batch) = read_one_batch(&mut self.input, &self.pruned_schema)? {
+        if let Some((num_rows, cols)) = read_one_batch(&mut self.input)? {
+            let batch = recover_named_batch(num_rows, &cols, self.pruned_schema.clone())?;
             self.cur_mem_used += batch.get_array_mem_size();
             self.cur_batch_num_rows = batch.num_rows();
             self.cur_loaded_num_rows = 0;
@@ -930,9 +929,11 @@
     )?;
 
     while let Some((key_collector, pruned_batch)) = merger.next().transpose()? {
-        let mut buf = vec![];
-        write_one_batch(&pruned_batch, &mut Cursor::new(&mut buf))?;
-        output_writer.write_all(&buf)?;
+        write_one_batch(
+            pruned_batch.num_rows(),
+            pruned_batch.columns(),
+            &mut output_writer,
+        )?;
         output_writer.write_all(&key_collector.store)?;
     }
     drop(output_writer);
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala
index 4c94299..4eda535 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala
@@ -115,15 +115,15 @@
 class ShimsImpl extends Shims with Logging {
 
   @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim")))
-  def shimVersion: String = "spark303"
+  override def shimVersion: String = "spark303"
   @enableIf(Seq("spark320").contains(System.getProperty("blaze.shim")))
-  def shimVersion: String = "spark320"
+  override def shimVersion: String = "spark320"
   @enableIf(Seq("spark324").contains(System.getProperty("blaze.shim")))
-  def shimVersion: String = "spark324"
+  override def shimVersion: String = "spark324"
   @enableIf(Seq("spark333").contains(System.getProperty("blaze.shim")))
-  def shimVersion: String = "spark333"
+  override def shimVersion: String = "spark333"
   @enableIf(Seq("spark351").contains(System.getProperty("blaze.shim")))
-  def shimVersion: String = "spark351"
+  override def shimVersion: String = "spark351"
 
   @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
   override def initExtension(): Unit = {
@@ -717,18 +717,10 @@
       e: Expression,
       isPruningExpr: Boolean,
       fallback: Expression => pb.PhysicalExprNode): Option[pb.PhysicalExprNode] = {
-    import org.apache.spark.sql.catalyst.expressions.{Cast, PromotePrecision}
+    import org.apache.spark.sql.catalyst.expressions.PromotePrecision
     e match {
       case PromotePrecision(_1) =>
-        Some(_1 match {
-          case cast: Cast if cast.dataType == _1.dataType =>
-            NativeConverters.convertExprWithFallback(_1, isPruningExpr, fallback)
-          case _ =>
-            NativeConverters.convertExprWithFallback(
-              Cast(_1, _1.dataType),
-              isPruningExpr,
-              fallback)
-        })
+        Some(NativeConverters.convertExprWithFallback(_1, isPruningExpr, fallback))
       case _ => None
     }
   }
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala
index 3a6906b..a40433d 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala
@@ -40,8 +40,6 @@
       override protected def outputExpressions: Seq[NamedExpression] = output
 
       override protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering
-
-      override def nodeName: String = "NativeRenameColumnsExec"
     }
     NativeRenameColumnsExec(child, renamedColumnNames)
   }
@@ -66,8 +64,6 @@
       override protected def outputExpressions: Seq[NamedExpression] = output
 
       override protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering
-
-      override def nodeName: String = "NativeRenameColumnsExec"
     }
     NativeRenameColumnsExec(child, renamedColumnNames)
   }
@@ -81,8 +77,6 @@
 
       override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
         copy(child = newChildren.head)
-
-      override def nodeName: String = "NativeRenameColumnsExec"
     }
     NativeRenameColumnsExec(child, renamedColumnNames)
   }
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala
index a7d7eef..f2f86b4 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala
@@ -69,8 +69,6 @@
       writeMetrics ++
       Map("dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"))).toMap
 
-  Math.max(child.outputPartitioning.numPartitions * outputPartitioning.numPartitions, 1)
-
   // 'mapOutputStatisticsFuture' is only needed when enable AQE.
   @transient override lazy val mapOutputStatisticsFuture: Future[MapOutputStatistics] = {
     if (inputRDD.getNumPartitions == 0) {
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala
index f19f3b8..bbc7f1d 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala
@@ -71,7 +71,12 @@
     import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode
 
     def mode = HashedRelationBroadcastMode(buildBoundKeys, isNullAware = false)
-    BroadcastDistribution(mode) :: UnspecifiedDistribution :: Nil
+    broadcastSide match {
+      case BroadcastLeft =>
+        BroadcastDistribution(mode) :: UnspecifiedDistribution :: Nil
+      case BroadcastRight =>
+        UnspecifiedDistribution :: BroadcastDistribution(mode) :: Nil
+    }
   }
 
   @enableIf(
diff --git a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java
index b6cfe3d..3080fe2 100644
--- a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java
+++ b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java
@@ -70,7 +70,10 @@
     PARQUET_ENABLE_BLOOM_FILTER("spark.blaze.parquet.enable.bloomFilter", false),
 
     // spark io compression codec
-    SPARK_IO_COMPRESSION_CODEC("spark.io.compression.codec", "lz4");
+    SPARK_IO_COMPRESSION_CODEC("spark.io.compression.codec", "lz4"),
+
+    // replace all sort-merge join to shuffled-hash join, only used for benchmarking
+    FORCE_SHUFFLED_HASH_JOIN("spark.blaze.forceShuffledHashJoin", false);
 
     private final String key;
     private final Object defaultValue;
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConvertStrategy.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConvertStrategy.scala
index 0da5b3d..aace9f3 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConvertStrategy.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConvertStrategy.scala
@@ -38,6 +38,7 @@
 import org.apache.spark.sql.execution.window.WindowExec
 import org.apache.spark.sql.execution.GenerateExec
 import org.apache.spark.sql.execution.LocalTableScanExec
+import org.apache.spark.sql.execution.blaze.plan.BuildSide
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
 import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
@@ -49,6 +50,7 @@
   val convertStrategyTag: TreeNodeTag[ConvertStrategy] = TreeNodeTag("blaze.convert.strategy")
   val childOrderingRequiredTag: TreeNodeTag[Boolean] = TreeNodeTag(
     "blaze.child.ordering.required")
+  val joinSmallerSideTag: TreeNodeTag[BuildSide] = TreeNodeTag("blaze.join.smallerSide")
 
   def apply(exec: SparkPlan): Unit = {
     exec.foreach(_.setTagValue(convertibleTag, true))
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConverters.scala
index 3491818..fc41359 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConverters.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConverters.scala
@@ -26,6 +26,7 @@
 import org.apache.spark.sql.blaze.BlazeConvertStrategy.convertibleTag
 import org.apache.spark.sql.blaze.BlazeConvertStrategy.convertStrategyTag
 import org.apache.spark.sql.blaze.BlazeConvertStrategy.isNeverConvert
+import org.apache.spark.sql.blaze.BlazeConvertStrategy.joinSmallerSideTag
 import org.apache.spark.sql.blaze.NativeConverters.StubExpr
 import org.apache.spark.sql.catalyst.expressions.Alias
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -132,6 +133,8 @@
       exec
         .getTagValue(childOrderingRequiredTag)
         .foreach(newExec.setTagValue(childOrderingRequiredTag, _))
+      exec.getTagValue(joinSmallerSideTag).foreach(newExec.setTagValue(joinSmallerSideTag, _))
+
       if (!isNeverConvert(newExec)) {
         newExec = convertSparkPlan(newExec)
       }
@@ -336,6 +339,40 @@
   }
 
   def convertSortMergeJoinExec(exec: SortMergeJoinExec): SparkPlan = {
+    val requireOrdering = exec.getTagValue(childOrderingRequiredTag).contains(true)
+
+    // force shuffled-hash join
+    if (!requireOrdering
+      && BlazeConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf()
+      && exec.children.forall(_.isInstanceOf[NativeSortBase])) {
+      val (leftKeys, rightKeys, joinType, condition, left, right) =
+        (exec.leftKeys, exec.rightKeys, exec.joinType, exec.condition, exec.left, exec.right)
+      logDebug(
+        s"Converting SortMergeJoinExec (with forceShuffledHashJoin): ${Shims.get.simpleStringWithNodeId(exec)}")
+      logDebug(s"  leftKeys: $leftKeys")
+      logDebug(s"  rightKeys: $rightKeys")
+      logDebug(s"  joinType: $joinType")
+      logDebug(s"  condition: $condition")
+      assert(condition.isEmpty, "join condition is not supported")
+
+      val buildSide = exec.getTagValue(joinSmallerSideTag) match {
+        case Some(org.apache.spark.sql.execution.blaze.plan.BuildLeft) =>
+          org.apache.spark.sql.execution.blaze.plan.BuildLeft
+        case Some(org.apache.spark.sql.execution.blaze.plan.BuildRight) =>
+          org.apache.spark.sql.execution.blaze.plan.BuildRight
+        case None =>
+          logWarning("JoinSmallerSideTag is missing, defaults to BuildRight")
+          org.apache.spark.sql.execution.blaze.plan.BuildRight
+      }
+      return Shims.get.createNativeShuffledHashJoinExec(
+        addRenameColumnsExec(convertToNative(left.children(0))),
+        addRenameColumnsExec(convertToNative(right.children(0))),
+        leftKeys,
+        rightKeys,
+        joinType,
+        buildSide)
+    }
+
     val (leftKeys, rightKeys, joinType, condition, left, right) =
       (exec.leftKeys, exec.rightKeys, exec.joinType, exec.condition, exec.left, exec.right)
     logDebug(s"Converting SortMergeJoinExec: ${Shims.get.simpleStringWithNodeId(exec)}")
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeSparkSessionExtension.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeSparkSessionExtension.scala
index 45d75b9..663e159 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeSparkSessionExtension.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeSparkSessionExtension.scala
@@ -15,6 +15,7 @@
  */
 package org.apache.spark.sql.blaze
 
+import org.apache.gluten.extension.BlazeStrategyOverrides
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.ConfigEntry
@@ -37,6 +38,9 @@
     assert(BlazeSparkSessionExtension.blazeEnabledKey != null)
     Shims.get.onApplyingExtension()
 
+    extensions.injectPlannerStrategy(sparkSession => {
+      BlazeStrategyOverrides(sparkSession)
+    })
     extensions.injectColumnar(sparkSession => {
       BlazeColumnarOverrides(sparkSession)
     })
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeStrategyOverrides.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeStrategyOverrides.scala
new file mode 100644
index 0000000..de8b234
--- /dev/null
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeStrategyOverrides.scala
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2022 The Blaze Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.blaze.BlazeConvertStrategy
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.blaze.plan.BuildLeft
+import org.apache.spark.sql.execution.blaze.plan.BuildRight
+import org.apache.spark.sql.blaze.BlazeConf
+import org.apache.spark.sql.blaze.Shims
+
+case class BlazeStrategyOverrides(session: SparkSession) extends Strategy {
+  private val planner = SparkSession.active.sessionState.planner
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    if (BlazeConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf()) {
+      if (Shims.get.shimVersion < "spark324") {
+        throw new RuntimeException(
+          "spark.blaze.forceShuffledHashJoin is only used for benchmarking " +
+            "and requires spark version >= 3.2.4")
+      }
+      markSmallerJoinSide(plan) match {
+        case Seq(sparkPlan) => return Seq(sparkPlan)
+        case Nil =>
+      }
+    }
+    Nil
+  }
+
+  private def markSmallerJoinSide(plan: LogicalPlan): Seq[SparkPlan] = {
+    planner.JoinSelection.apply(plan) match {
+      case Seq(physicalPlan) if physicalPlan.children.length == 2 =>
+        val left = plan.children(0)
+        val right = plan.children(1)
+        if (right.stats.sizeInBytes <= left.stats.sizeInBytes) {
+          physicalPlan.setTagValue(BlazeConvertStrategy.joinSmallerSideTag, BuildRight)
+        } else {
+          physicalPlan.setTagValue(BlazeConvertStrategy.joinSmallerSideTag, BuildLeft)
+        }
+        Seq(physicalPlan)
+
+      case Nil => Nil
+    }
+  }
+}
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeConverters.scala
index 81c5fa2..7675527 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeConverters.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeConverters.scala
@@ -22,6 +22,8 @@
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.math.max
+import scala.math.min
 
 import com.google.protobuf.ByteString
 import org.apache.spark.SparkEnv
@@ -57,6 +59,7 @@
 import org.apache.spark.sql.execution.ScalarSubquery
 import org.apache.spark.sql.hive.blaze.HiveUDFUtil
 import org.apache.spark.sql.hive.blaze.HiveUDFUtil.getFunctionClassName
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.ArrayType
 import org.apache.spark.sql.types.AtomicType
 import org.apache.spark.sql.types.BinaryType
@@ -554,7 +557,21 @@
         val lhs = e.left
         val rhs = e.right
         val resultType = e.dataType
-        if (lhs.dataType.isInstanceOf[DecimalType] || rhs.dataType.isInstanceOf[DecimalType]) {
+        if (lhs.dataType.isInstanceOf[DecimalType] && rhs.dataType.isInstanceOf[DecimalType]) {
+          def resultDecimalType(p1: Int, s1: Int, p2: Int, s2: Int): DecimalType = {
+            val resultScale = max(s1, s2)
+            val resultPrecision = max(p1 - s1, p2 - s2) + resultScale + 1
+            if (SQLConf.get.decimalOperationsAllowPrecisionLoss) {
+              DecimalType.adjustPrecisionScale(resultPrecision, resultScale)
+            } else {
+              DecimalType.bounded(resultPrecision, resultScale)
+            }
+          }
+          val resultType = (lhs.dataType, rhs.dataType) match {
+            case (lhsType: DecimalType, rhsType: DecimalType) =>
+              resultDecimalType(lhsType.precision, lhsType.scale, rhsType.precision, rhsType.scale)
+          }
+
           buildExprNode {
             _.setCast(pb.PhysicalCastNode
               .newBuilder()
@@ -576,7 +593,22 @@
         val lhs = e.left
         val rhs = e.right
         val resultType = e.dataType
-        if (lhs.dataType.isInstanceOf[DecimalType] || rhs.dataType.isInstanceOf[DecimalType]) {
+        if (lhs.dataType.isInstanceOf[DecimalType] && rhs.dataType.isInstanceOf[DecimalType]) {
+          // copied from spark3.5
+          def resultDecimalType(p1: Int, s1: Int, p2: Int, s2: Int): DecimalType = {
+            val resultScale = max(s1, s2)
+            val resultPrecision = max(p1 - s1, p2 - s2) + resultScale + 1
+            if (SQLConf.get.decimalOperationsAllowPrecisionLoss) {
+              DecimalType.adjustPrecisionScale(resultPrecision, resultScale)
+            } else {
+              DecimalType.bounded(resultPrecision, resultScale)
+            }
+          }
+          val resultType = (lhs.dataType, rhs.dataType) match {
+            case (lhsType: DecimalType, rhsType: DecimalType) =>
+              resultDecimalType(lhsType.precision, lhsType.scale, rhsType.precision, rhsType.scale)
+          }
+
           buildExprNode {
             _.setCast(pb.PhysicalCastNode
               .newBuilder()
@@ -597,8 +629,22 @@
       case e: Multiply =>
         val lhs = e.left
         val rhs = e.right
-        val resultType = e.dataType
-        if (lhs.dataType.isInstanceOf[DecimalType] || rhs.dataType.isInstanceOf[DecimalType]) {
+        if (lhs.dataType.isInstanceOf[DecimalType] && rhs.dataType.isInstanceOf[DecimalType]) {
+          // copied from spark3.5
+          def resultDecimalType(p1: Int, s1: Int, p2: Int, s2: Int): DecimalType = {
+            val resultScale = s1 + s2
+            val resultPrecision = p1 + p2 + 1
+            if (SQLConf.get.decimalOperationsAllowPrecisionLoss) {
+              DecimalType.adjustPrecisionScale(resultPrecision, resultScale)
+            } else {
+              DecimalType.bounded(resultPrecision, resultScale)
+            }
+          }
+          val resultType = (lhs.dataType, rhs.dataType) match {
+            case (lhsType: DecimalType, rhsType: DecimalType) =>
+              resultDecimalType(lhsType.precision, lhsType.scale, rhsType.precision, rhsType.scale)
+          }
+
           buildExprNode {
             _.setCast(pb.PhysicalCastNode
               .newBuilder()
@@ -619,8 +665,30 @@
       case e: Divide =>
         val lhs = e.left
         val rhs = e.right
-        if (lhs.dataType.isInstanceOf[DecimalType] || rhs.dataType.isInstanceOf[DecimalType]) {
-          val resultType = e.dataType
+        if (lhs.dataType.isInstanceOf[DecimalType] && rhs.dataType.isInstanceOf[DecimalType]) {
+          // copied from spark3.5
+          def resultDecimalType(p1: Int, s1: Int, p2: Int, s2: Int): DecimalType = {
+            if (SQLConf.get.decimalOperationsAllowPrecisionLoss) {
+              val intDig = p1 - s1 + s2
+              val scale = max(DecimalType.MINIMUM_ADJUSTED_SCALE, s1 + p2 + 1)
+              val prec = intDig + scale
+              DecimalType.adjustPrecisionScale(prec, scale)
+            } else {
+              var intDig = min(DecimalType.MAX_SCALE, p1 - s1 + s2)
+              var decDig = min(DecimalType.MAX_SCALE, max(6, s1 + p2 + 1))
+              val diff = (intDig + decDig) - DecimalType.MAX_SCALE
+              if (diff > 0) {
+                decDig -= diff / 2 + 1
+                intDig = DecimalType.MAX_SCALE - decDig
+              }
+              DecimalType.bounded(intDig + decDig, decDig)
+            }
+          }
+          val resultType = (lhs.dataType, rhs.dataType) match {
+            case (lhsType: DecimalType, rhsType: DecimalType) =>
+              resultDecimalType(lhsType.precision, lhsType.scale, rhsType.precision, rhsType.scale)
+          }
+
           buildExprNode {
             _.setCast(pb.PhysicalCastNode
               .newBuilder()
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala
index 6cd79dd..b65bdc9 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala
@@ -95,7 +95,9 @@
       "output_rows" -> SQLMetrics.createMetric(sc, "Native.output_rows"),
       "output_batches" -> SQLMetrics.createMetric(sc, "Native.output_batches"),
       "elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "Native.elapsed_compute"),
-      "join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Native.join_time"),
+      "build_hash_map_time" -> SQLMetrics.createNanoTimingMetric(
+        sc,
+        "Native.build_hash_map_time"),
       "mem_spill_count" -> SQLMetrics.createMetric(sc, "Native.mem_spill_count"),
       "mem_spill_size" -> SQLMetrics.createSizeMetric(sc, "Native.mem_spill_size"),
       "mem_spill_iotime" -> SQLMetrics.createNanoTimingMetric(sc, "Native.mem_spill_iotime"),
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/Shims.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/Shims.scala
index 7657500..823f7e4 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/Shims.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/Shims.scala
@@ -53,6 +53,8 @@
 
 abstract class Shims {
 
+  def shimVersion: String
+
   def initExtension(): Unit = {}
 
   def onApplyingExtension(): Unit = {}
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/SparkUDTFWrapperContext.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/SparkUDTFWrapperContext.scala
index 0a075a2..d62771d 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/SparkUDTFWrapperContext.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/SparkUDTFWrapperContext.scala
@@ -102,22 +102,20 @@
     Using.resource(ArrowUtils.newChildAllocator(getClass.getName)) { batchAllocator =>
       Using.resources(
         VectorSchemaRoot.create(outputSchema, batchAllocator),
-        ArrowArray.wrap(exportFFIArrayPtr)) {
-        (outputRoot, exportArray) =>
+        ArrowArray.wrap(exportFFIArrayPtr)) { (outputRoot, exportArray) =>
+        // evaluate expression and write to output root
+        val outputWriter = ArrowWriter.create(outputRoot)
+        for (outputRow <- expr.terminate()) {
+          outputWriter.write(InternalRow(rowId, outputRow))
+        }
+        outputWriter.finish()
 
-          // evaluate expression and write to output root
-          val outputWriter = ArrowWriter.create(outputRoot)
-          for (outputRow <- expr.terminate()) {
-            outputWriter.write(InternalRow(rowId, outputRow))
-          }
-          outputWriter.finish()
-
-          // export to output using root allocator
-          Data.exportVectorSchemaRoot(
-            ArrowUtils.rootAllocator,
-            outputRoot,
-            dictionaryProvider,
-            exportArray)
+        // export to output using root allocator
+        Data.exportVectorSchemaRoot(
+          ArrowUtils.rootAllocator,
+          outputRoot,
+          dictionaryProvider,
+          exportArray)
       }
     }
   }
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeBase.scala
index 6fcbd47..d198099 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeBase.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeBase.scala
@@ -15,10 +15,8 @@
  */
 package org.apache.spark.sql.execution.blaze.plan
 
-import java.io.ByteArrayInputStream
 import java.io.ByteArrayOutputStream
 import java.nio.ByteBuffer
-import java.nio.channels.Channels
 import java.util.UUID
 import java.util.concurrent.Future
 import java.util.concurrent.TimeoutException
@@ -58,6 +56,7 @@
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.blaze.plan.NativeBroadcastExchangeBase.buildBroadcastData
+import org.apache.spark.sql.execution.blaze.shuffle.BlockObject
 import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
 import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
 import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode
@@ -168,9 +167,12 @@
       (_, _) => {
         val resourceId = s"ArrowBroadcastExchangeExec:${UUID.randomUUID()}"
         val provideIpcIterator = () => {
-          broadcast.value.iterator.map(bytes => {
-            Channels.newChannel(new ByteArrayInputStream(bytes))
-          })
+          broadcast.value.iterator.map(bytes =>
+            new BlockObject {
+              override def hasByteBuffer: Boolean = true
+              override def getByteBuffer: ByteBuffer = ByteBuffer.wrap(bytes)
+              override def close(): Unit = {}
+            })
         }
 
         JniBridge.resourcesMap.put(resourceId, () => provideIpcIterator())
@@ -303,7 +305,6 @@
       .setInput(pb.PhysicalPlanNode.newBuilder().setBroadcastJoinBuildHashMap(buildHashMapExec))
       .setIpcConsumerResourceId(writerIpcProviderResourceId)
 
-    // build native sorter
     val exec = pb.PhysicalPlanNode
       .newBuilder()
       .setIpcWriter(writerExec)
@@ -311,10 +312,12 @@
 
     // input
     val provideIpcIterator = () => {
-      collectedData.iterator.map { ipc =>
-        val inputStream = new ByteArrayInputStream(ipc)
-        Channels.newChannel(inputStream)
-      }
+      collectedData.iterator.map(bytes =>
+        new BlockObject {
+          override def hasByteBuffer: Boolean = true
+          override def getByteBuffer: ByteBuffer = ByteBuffer.wrap(bytes)
+          override def close(): Unit = {}
+        })
     }
     JniBridge.resourcesMap.put(readerIpcProviderResourceId, () => provideIpcIterator())
 
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastJoinBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastJoinBase.scala
index dc27d2a..c07973f 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastJoinBase.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastJoinBase.scala
@@ -53,6 +53,14 @@
   override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map(
     NativeHelper
       .getDefaultNativeMetrics(sparkContext)
+      .filterKeys(
+        Set(
+          "stage_id",
+          "output_rows",
+          "elapsed_compute",
+          "input_batch_count",
+          "input_batch_mem_size",
+          "input_row_count"))
       .toSeq: _*)
 
   private val isLongHashRelation = {
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffledHashJoinBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffledHashJoinBase.scala
index 6812651..fa0cdc1 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffledHashJoinBase.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffledHashJoinBase.scala
@@ -52,6 +52,7 @@
           "stage_id",
           "output_rows",
           "elapsed_compute",
+          "build_hash_map_time",
           "input_batch_count",
           "input_batch_mem_size",
           "input_row_count"))
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReaderBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReaderBase.scala
index d8c5b55..49f069d 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReaderBase.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReaderBase.scala
@@ -15,24 +15,27 @@
  */
 package org.apache.spark.sql.execution.blaze.shuffle
 
-import java.io.File
 import java.io.FileInputStream
 import java.io.FilterInputStream
 import java.io.InputStream
 import java.lang.reflect.Field
 import java.lang.reflect.Method
 import java.nio.channels.Channels
+import java.nio.channels.ReadableByteChannel
+import java.nio.ByteBuffer
+
+import scala.annotation.tailrec
 
 import org.apache.spark.InterruptibleIterator
 import org.apache.spark.ShuffleDependency
 import org.apache.spark.TaskContext
-
 import org.apache.spark.network.util.LimitedInputStream
 import org.apache.spark.shuffle.BaseShuffleHandle
 import org.apache.spark.shuffle.ShuffleReader
-import org.apache.spark.sql.blaze.Shims
 import org.apache.spark.storage.BlockId
-import org.apache.spark.storage.FileSegment
+
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufInputStream
 
 abstract class BlazeBlockStoreShuffleReaderBase[K, C](
     handle: BaseShuffleHandle[K, _, C],
@@ -43,18 +46,13 @@
   protected val dep: ShuffleDependency[K, _, C] = handle.dependency
   protected def readBlocks(): Iterator[(BlockId, InputStream)]
 
-  def readIpc(): Iterator[Object] = { // FileSegment | ReadableByteChannel
+  def readIpc(): Iterator[BlockObject] = {
     val ipcIterator = readBlocks().map { case (_, inputStream) =>
-      getFileSegmentFromInputStream(inputStream) match {
-        case Some(fileSegment) =>
-          fileSegment
-        case None =>
-          Channels.newChannel(inputStream)
-      }
+      createBlockObject(inputStream)
     }
 
     // An interruptible iterator must be used here in order to support task cancellation
-    new InterruptibleIterator[Object](context, ipcIterator)
+    new InterruptibleIterator[BlockObject](context, ipcIterator)
   }
 
   /** Read the combined key-values for this reduce task */
@@ -64,32 +62,85 @@
 }
 
 object BlazeBlockStoreShuffleReaderBase {
-  private val bufferReleasingInputStreamClass: Class[_] =
+  def createBlockObject(in: InputStream): BlockObject = {
+    getFileSegmentFromInputStream(in) match {
+      case Some((path, offset, limit)) =>
+        return new BlockObject {
+          override def hasFileSegment: Boolean = true
+          override def getFilePath: String = path
+          override def getFileOffset: Long = offset
+          override def getFileLength: Long = limit
+          override def close(): Unit = in.close()
+        }
+      case None =>
+    }
+
+    getByteBufferFromInputStream(in) match {
+      case Some(buf) =>
+        return new BlockObject {
+          override def hasByteBuffer: Boolean = true
+          override def getByteBuffer: ByteBuffer = buf
+          override def close(): Unit = in.close()
+        }
+      case None =>
+    }
+
+    val channel = Channels.newChannel(in)
+    new BlockObject {
+      override def getChannel: ReadableByteChannel = channel
+      override def close(): Unit = channel.close()
+    }
+  }
+
+  private lazy val bufferReleasingInputStreamClass: Class[_] =
     Class.forName("org.apache.spark.storage.BufferReleasingInputStream")
-  private val delegateFn: Method =
+  private lazy val delegateFn: Method =
     bufferReleasingInputStreamClass.getDeclaredMethods.find(_.getName.endsWith("delegate")).get
-  private val inField: Field = classOf[FilterInputStream].getDeclaredField("in")
-  private val limitField: Field = classOf[LimitedInputStream].getDeclaredField("left")
-  private val pathField: Field = classOf[FileInputStream].getDeclaredField("path")
+  private lazy val inField: Field = classOf[FilterInputStream].getDeclaredField("in")
+  private lazy val limitField: Field = classOf[LimitedInputStream].getDeclaredField("left")
+  private lazy val pathField: Field = classOf[FileInputStream].getDeclaredField("path")
   delegateFn.setAccessible(true)
   inField.setAccessible(true)
   limitField.setAccessible(true)
   pathField.setAccessible(true)
 
-  def getFileSegmentFromInputStream(in: InputStream): Option[FileSegment] = {
-    if (!bufferReleasingInputStreamClass.isInstance(in)) {
-      return None
+  private lazy val timeTrackingInputStreamClass: Class[_] =
+    Class.forName("org.apache.spark.storage.TimeTrackingInputStream")
+  private lazy val inputStreamField = {
+    val f = timeTrackingInputStreamClass.getDeclaredField("inputStream")
+    f.setAccessible(true)
+    f
+  }
+
+  private lazy val byteBufInputStreamClass: Class[_] =
+    Class.forName("io.netty.buffer.ByteBufInputStream")
+  private lazy val bufferField = byteBufInputStreamClass.getDeclaredField("buffer")
+  private lazy val startIndexField = byteBufInputStreamClass.getDeclaredField("startIndex")
+  private lazy val endIndexField = byteBufInputStreamClass.getDeclaredField("endIndex")
+  bufferField.setAccessible(true)
+  startIndexField.setAccessible(true)
+  endIndexField.setAccessible(true)
+
+  @tailrec
+  private def unwrapInputStream(in: InputStream): InputStream = {
+    in match {
+      case in if bufferReleasingInputStreamClass.isInstance(in) =>
+        unwrapInputStream(delegateFn.invoke(in).asInstanceOf[InputStream])
+      // case in: TimeTrackingInputStream =>
+      //  unwrapInputStream(inputStreamField.get(in).asInstanceOf[InputStream])
+      case in => in
     }
-    delegateFn.invoke(in) match {
+  }
+
+  def getFileSegmentFromInputStream(in: InputStream): Option[(String, Long, Long)] = {
+    unwrapInputStream(in) match {
       case in: LimitedInputStream =>
         val limit = limitField.getLong(in)
         inField.get(in) match {
           case in: FileInputStream =>
             val path = pathField.get(in).asInstanceOf[String]
             val offset = in.getChannel.position()
-            val fileSegment =
-              Shims.get.createFileSegment(new File(path), offset, limit, 0)
-            Some(fileSegment)
+            Some((path, offset, limit))
           case _ =>
             None
         }
@@ -97,4 +148,29 @@
         None
     }
   }
+
+  def getByteBufferFromInputStream(in: InputStream): Option[ByteBuffer] = {
+    unwrapInputStream(in) match {
+      case in: ByteBufInputStream =>
+        Some(bufferField.get(in).asInstanceOf[ByteBuf].internalNioBuffer(0, in.available()))
+      case in: InputStreamToByteBuffer =>
+        Some(in.toByteBuffer)
+      case _ =>
+        None
+    }
+  }
+}
+
+trait InputStreamToByteBuffer {
+  def toByteBuffer: ByteBuffer
+}
+
+trait BlockObject extends AutoCloseable {
+  def hasFileSegment: Boolean = false
+  def hasByteBuffer: Boolean = false
+  def getFilePath: String = throw new UnsupportedOperationException
+  def getFileOffset: Long = throw new UnsupportedOperationException
+  def getFileLength: Long = throw new UnsupportedOperationException
+  def getByteBuffer: ByteBuffer = throw new UnsupportedOperationException
+  def getChannel: ReadableByteChannel = throw new UnsupportedOperationException
 }
diff --git a/tpcds/benchmark-runner/src/main/scala/org/apache/spark/sql/execution/benchmark/TPCDSBenchmarkRunner.scala b/tpcds/benchmark-runner/src/main/scala/org/apache/spark/sql/execution/benchmark/TPCDSBenchmarkRunner.scala
index 9bc2abc..739ca1f 100644
--- a/tpcds/benchmark-runner/src/main/scala/org/apache/spark/sql/execution/benchmark/TPCDSBenchmarkRunner.scala
+++ b/tpcds/benchmark-runner/src/main/scala/org/apache/spark/sql/execution/benchmark/TPCDSBenchmarkRunner.scala
@@ -66,6 +66,7 @@
       System.exit(-1)
     }
     Files.createDirectories(outputDir)
+    val logWriter = new PrintWriter(s"$outputDir/log")
 
     // start spark
     val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName)
@@ -83,7 +84,6 @@
     }
 
     // run queries
-    val logWriter = new PrintWriter(s"$outputDir/log")
     var numSucceeded = 0
     var numFailed = 0
     for (query <- queriesToRun) {
@@ -99,6 +99,7 @@
       }
 
       for (round <- 1 to benchmarkArgs.round) {
+        System.err.println(s"########## running case: $query, round: $round ##########")
         spark.sparkContext.setJobDescription(s"case: $query, round: $round")
         var rows: Array[Row] = Array()
         var succeeded = true
diff --git a/tpcds/datagen/src/main/scala/org/apache/spark/sql/execution/benchmark/TPCDSDatagen.scala b/tpcds/datagen/src/main/scala/org/apache/spark/sql/execution/benchmark/TPCDSDatagen.scala
index a3033ff..9647002 100644
--- a/tpcds/datagen/src/main/scala/org/apache/spark/sql/execution/benchmark/TPCDSDatagen.scala
+++ b/tpcds/datagen/src/main/scala/org/apache/spark/sql/execution/benchmark/TPCDSDatagen.scala
@@ -250,7 +250,7 @@
       tables
     }
 
-    withSpecifiedDataType.foreach { table =>
+    withSpecifiedDataType.par.foreach { table =>
       val tableLocation = s"$location/${table.name}"
       table.genData(tableLocation, format, overwrite, clusterByPartitionColumns,
         filterOutNullPartitionValues, numPartitions)