JAVA-1586: Throw underlying exception when codec not found in cache
diff --git a/changelog/README.md b/changelog/README.md
index afd5372..0e268d5 100644
--- a/changelog/README.md
+++ b/changelog/README.md
@@ -4,6 +4,7 @@
 
 ### 4.0.0-alpha1 (in progress)
 
+- [improvement] JAVA-1586: Throw underlying exception when codec not found in cache
 - [bug] JAVA-1583: Handle write failure in ChannelHandlerRequest
 - [improvement] JAVA-1541: Reorganize configuration
 - [improvement] JAVA-1577: Set default consistency level to LOCAL_ONE
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/type/codec/registry/DefaultCodecRegistry.java b/core/src/main/java/com/datastax/oss/driver/internal/core/type/codec/registry/DefaultCodecRegistry.java
index a06ebfc..c842079 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/type/codec/registry/DefaultCodecRegistry.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/type/codec/registry/DefaultCodecRegistry.java
@@ -15,13 +15,17 @@
  */
 package com.datastax.oss.driver.internal.core.type.codec.registry;
 
+import com.datastax.oss.driver.api.core.DriverExecutionException;
 import com.datastax.oss.driver.api.core.type.DataType;
 import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
 import com.datastax.oss.driver.api.core.type.reflect.GenericType;
+import com.google.common.base.Throwables;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalListener;
+import com.google.common.util.concurrent.ExecutionError;
+import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.util.Objects;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
@@ -87,7 +91,19 @@
   @Override
   protected TypeCodec<?> getCachedCodec(DataType cqlType, GenericType<?> javaType) {
     LOG.trace("[{}] Checking cache", logPrefix);
-    return cache.getUnchecked(new CacheKey(cqlType, javaType));
+    try {
+      return cache.getUnchecked(new CacheKey(cqlType, javaType));
+    } catch (UncheckedExecutionException | ExecutionError e) {
+      // unwrap exception cause and throw it directly.
+      Throwable cause = e.getCause();
+      if (cause != null) {
+        Throwables.throwIfUnchecked(cause);
+        throw new DriverExecutionException(cause);
+      } else {
+        // Should never happen, throw just in case
+        throw new RuntimeException(e.getMessage());
+      }
+    }
   }
 
   public static final class CacheKey {
diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/type/codec/registry/CachingCodecRegistryTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/type/codec/registry/CachingCodecRegistryTest.java
index 0c56b31..3009636 100644
--- a/core/src/test/java/com/datastax/oss/driver/internal/core/type/codec/registry/CachingCodecRegistryTest.java
+++ b/core/src/test/java/com/datastax/oss/driver/internal/core/type/codec/registry/CachingCodecRegistryTest.java
@@ -27,6 +27,7 @@
 import com.datastax.oss.driver.api.core.type.SetType;
 import com.datastax.oss.driver.api.core.type.TupleType;
 import com.datastax.oss.driver.api.core.type.UserDefinedType;
+import com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException;
 import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
 import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
 import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
@@ -36,7 +37,6 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.net.Inet4Address;
@@ -552,19 +552,19 @@
     try {
       CodecRegistry.DEFAULT.codecFor(StringBuilder.class);
       fail("Should not have found a codec for ANY <-> StringBuilder");
-    } catch (UncheckedExecutionException e) {
+    } catch (CodecNotFoundException e) {
       // expected
     }
     try {
       CodecRegistry.DEFAULT.codecFor(DataTypes.TEXT, StringBuilder.class);
       fail("Should not have found a codec for varchar <-> StringBuilder");
-    } catch (UncheckedExecutionException e) {
+    } catch (CodecNotFoundException e) {
       // expected
     }
     try {
       CodecRegistry.DEFAULT.codecFor(new StringBuilder());
       fail("Should not have found a codec for ANY <-> StringBuilder");
-    } catch (UncheckedExecutionException e) {
+    } catch (CodecNotFoundException e) {
       // expected
     }
   }
diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/api/core/type/codec/registry/CodecRegistryIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/api/core/type/codec/registry/CodecRegistryIT.java
new file mode 100644
index 0000000..867505b
--- /dev/null
+++ b/integration-tests/src/test/java/com/datastax/oss/driver/api/core/type/codec/registry/CodecRegistryIT.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright (C) 2017-2017 DataStax Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.oss.driver.api.core.type.codec.registry;
+
+import com.datastax.oss.driver.api.core.Cluster;
+import com.datastax.oss.driver.api.core.CqlIdentifier;
+import com.datastax.oss.driver.api.core.ProtocolVersion;
+import com.datastax.oss.driver.api.core.cql.BoundStatement;
+import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.core.session.Session;
+import com.datastax.oss.driver.api.core.type.DataType;
+import com.datastax.oss.driver.api.core.type.DataTypes;
+import com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
+import com.datastax.oss.driver.api.core.type.reflect.GenericType;
+import com.datastax.oss.driver.api.testinfra.ccm.CcmRule;
+import com.datastax.oss.driver.api.testinfra.cluster.ClusterRule;
+import com.datastax.oss.driver.internal.core.type.codec.IntCodec;
+import java.nio.ByteBuffer;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestName;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CodecRegistryIT {
+
+  @ClassRule public static CcmRule ccm = CcmRule.getInstance();
+
+  @ClassRule public static ClusterRule cluster = new ClusterRule(ccm);
+
+  @Rule public TestName name = new TestName();
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @BeforeClass
+  public static void createSchema() {
+    // table with simple primary key, single cell.
+    cluster
+        .session()
+        .execute(
+            SimpleStatement.builder("CREATE TABLE IF NOT EXISTS test (k text primary key, v int)")
+                .withConfigProfile(cluster.slowProfile())
+                .build());
+  }
+
+  // A simple codec that allows float values to be used for cassandra int column type.
+  private static class FloatCIntCodec implements TypeCodec<Float> {
+
+    private static final IntCodec intCodec = new IntCodec();
+
+    @Override
+    public GenericType<Float> getJavaType() {
+      return GenericType.of(Float.class);
+    }
+
+    @Override
+    public DataType getCqlType() {
+      return DataTypes.INT;
+    }
+
+    @Override
+    public ByteBuffer encode(Float value, ProtocolVersion protocolVersion) {
+      return intCodec.encode(value.intValue(), protocolVersion);
+    }
+
+    @Override
+    public Float decode(ByteBuffer bytes, ProtocolVersion protocolVersion) {
+      return intCodec.decode(bytes, protocolVersion).floatValue();
+    }
+
+    @Override
+    public String format(Float value) {
+      return intCodec.format(value.intValue());
+    }
+
+    @Override
+    public Float parse(String value) {
+      return intCodec.parse(value).floatValue();
+    }
+  }
+
+  @Test
+  public void should_throw_exception_if_no_codec_registered_for_type_set() {
+    PreparedStatement prepared = cluster.session().prepare("INSERT INTO test (k, v) values (?, ?)");
+
+    thrown.expect(CodecNotFoundException.class);
+
+    // float value for int column should not work since no applicable codec.
+    prepared.boundStatementBuilder().setString(0, name.getMethodName()).setFloat(1, 3.14f).build();
+  }
+
+  @Test
+  public void should_throw_exception_if_no_codec_registered_for_type_get() {
+    PreparedStatement prepared = cluster.session().prepare("INSERT INTO test (k, v) values (?, ?)");
+
+    BoundStatement insert =
+        prepared.boundStatementBuilder().setString(0, name.getMethodName()).setInt(1, 2).build();
+    cluster.session().execute(insert);
+
+    ResultSet result =
+        cluster
+            .session()
+            .execute(
+                SimpleStatement.builder("SELECT v from TEST where k = ?")
+                    .addPositionalValue(name.getMethodName())
+                    .build());
+
+    assertThat(result.getAvailableWithoutFetching()).isEqualTo(1);
+
+    // should not be able to access int column as float as no codec is registered to handle that.
+    Row row = result.iterator().next();
+
+    thrown.expect(CodecNotFoundException.class);
+
+    assertThat(row.getFloat("v")).isEqualTo(3.0f);
+  }
+
+  @Test
+  public void should_be_able_to_register_and_use_custom_codec() {
+    // create a cluster with a registered codec from Float <-> cql int.
+    try (Cluster codecCluster =
+        Cluster.builder()
+            .addTypeCodecs(new FloatCIntCodec())
+            .addContactPoints(ccm.getContactPoints())
+            .build()) {
+      Session session = codecCluster.connect(CqlIdentifier.fromCql(cluster.keyspace()));
+
+      PreparedStatement prepared = session.prepare("INSERT INTO test (k, v) values (?, ?)");
+
+      // float value for int column should work.
+      BoundStatement insert =
+          prepared
+              .boundStatementBuilder()
+              .setString(0, name.getMethodName())
+              .setFloat(1, 3.14f)
+              .build();
+      session.execute(insert);
+
+      ResultSet result =
+          session.execute(
+              SimpleStatement.builder("SELECT v from TEST where k = ?")
+                  .addPositionalValue(name.getMethodName())
+                  .build());
+
+      assertThat(result.getAvailableWithoutFetching()).isEqualTo(1);
+
+      // should be able to retrieve value back as float, some precision is lost due to going from int -> float.
+      Row row = result.iterator().next();
+      assertThat(row.getFloat("v")).isEqualTo(3.0f);
+      assertThat(row.getFloat(0)).isEqualTo(3.0f);
+    }
+  }
+}