// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

#include <assert.h>
#include <string.h>
#include <stdio.h>

#include "erl_nif.h"
#include "fdb.h"

#include "atoms.h"
#include "resources.h"
#include "util.h"


// FoundationDB can only be intiailized once
// in a given OS process. By creating a random
// atom we can flag whether erlfdb was previously
// initialized in a given Erlang VM.
const char* SENTINEL = "faaffadf46e64b87bdd0fedbddd97b1a";


typedef enum _ErlFDBLibState
{
    ErlFDB_STATE_ERROR = 0,
    ErlFDB_LOADED,
    ErlFDB_API_SELECTED,
    ErlFDB_NETWORK_STARTING,
    ErlFDB_CONNECTED,
} ErlFDBLibState;


typedef struct _ErlFDBSt
{
    ErlFDBLibState lib_state;
    ErlNifTid network_tid;
    ErlNifMutex* lock;
    ErlNifCond* cond;
} ErlFDBSt;


static void*
erlfdb_network_thread(void* arg)
{
    ErlFDBSt* st = (ErlFDBSt*) arg;
    fdb_error_t* err = enif_alloc(sizeof(fdb_error_t));

    enif_mutex_lock(st->lock);

    st->lib_state = ErlFDB_NETWORK_STARTING;

    enif_cond_signal(st->cond);
    enif_mutex_unlock(st->lock);

    *err = fdb_run_network();

    return (void*) err;
}


static void
erlfdb_future_cb(FDBFuture* fdb_future, void* data)
{
    ErlFDBFuture* future = (ErlFDBFuture*) data;
    ErlNifEnv* caller;
    ERL_NIF_TERM msg;

    // FoundationDB callbacks can fire from the thread
    // that created them. Check if we were actually
    // submitted to the network thread or not so that
    // we pass the correct environment to enif_send
    if(enif_thread_type() == ERL_NIF_THR_UNDEFINED) {
        caller = NULL;
    } else {
        caller = future->pid_env;
    }

    enif_mutex_lock(future->lock);

    if(!future->cancelled) {
        msg = T2(future->msg_env, future->msg_ref, ATOM_ready);
        enif_send(caller, &(future->pid), future->msg_env, msg);
    }

    enif_mutex_unlock(future->lock);

    // We're now done with this future which means we need
    // to release our handle to it. See erlfdb_create_future
    // for more on why this happens here.

    enif_release_resource(future);

    return;
}


static ERL_NIF_TERM
erlfdb_create_future(ErlNifEnv* env, FDBFuture* future, ErlFDBFutureType ftype)
{
    ErlFDBFuture* f;
    ERL_NIF_TERM ref = enif_make_ref(env);
    ERL_NIF_TERM ret;
    fdb_error_t err;

    f = enif_alloc_resource(ErlFDBFutureRes, sizeof(ErlFDBFuture));
    f->future = future;
    f->ftype = ftype;
    enif_self(env, &(f->pid));
    f->pid_env = env;
    f->msg_env = enif_alloc_env();
    f->msg_ref = enif_make_copy(f->msg_env, ref);
    f->lock = enif_mutex_create("fdb:future_lock");
    f->cancelled = false;

    // This resource reference counting dance is a bit
    // awkward as erlfdb_future_cb can be called both
    // synchronously and asynchronously.
    //
    // At this point the reference count is 1

    enif_keep_resource(f);

    // The reference count is now 2

    err = fdb_future_set_callback(
            f->future,
            erlfdb_future_cb,
            (void*) f
        );

    if(err != 0) {
        // If we failed to set the future callback function
        // then we assume the callback was not invoked and
        // we have to release twice
        enif_release_resource(f);
        enif_release_resource(f);
        return erlfdb_erlang_error(env, err);
    }

    // At this point our callback may have been called
    // which means we could have a reference count
    // of 1 or 2

    ret = enif_make_resource(env, f);

    // enif_make_resource increases the ref count
    // so now our reference count is either 2 or 3

    enif_release_resource(f);

    // Finally our reference count is now either
    // 1 or 2 depending on whether the callback
    // has already fired. If the ref count is
    // 2 then Erlang has a reference and the network
    // thread has a reference. If its 1 then only
    // Erlang has a reference.

    return T3(env, ATOM_erlfdb_future, ref, ret);
}


static inline ERL_NIF_TERM
erlfdb_future_get_void(ErlNifEnv* env, ErlFDBFuture* f)
{
    fdb_error_t err;

    err = fdb_future_get_error(f->future);
    if(err != 0) {
        return erlfdb_erlang_error(env, err);
    }

    return ATOM_ok;
}


static inline ERL_NIF_TERM
erlfdb_future_get_int64(ErlNifEnv* env, ErlFDBFuture* f)
{
    int64_t fdb_res;
    ErlNifSInt64 nif_res;
    fdb_error_t err;

    err = fdb_future_get_int64(f->future, &fdb_res);
    if(err != 0) {
        return erlfdb_erlang_error(env, err);
    }

    nif_res = fdb_res;

    return enif_make_int64(env, nif_res);
}


static inline ERL_NIF_TERM
erlfdb_future_get_key(ErlNifEnv* env, ErlFDBFuture* f)
{
    const uint8_t* key;
    int len;
    unsigned char* buf;
    ERL_NIF_TERM ret;
    fdb_error_t err;

    err = fdb_future_get_key(f->future, &key, &len);
    if(err != 0) {
        return erlfdb_erlang_error(env, err);
    }

    buf = enif_make_new_binary(env, len, &ret);
    memcpy(buf, key, len);

    return ret;
}


static inline ERL_NIF_TERM
erlfdb_future_get_value(ErlNifEnv* env, ErlFDBFuture* f)
{
    fdb_bool_t present;
    const uint8_t* val;
    int len;
    unsigned char* buf;
    ERL_NIF_TERM ret;
    fdb_error_t err;

    err = fdb_future_get_value(f->future, &present, &val, &len);
    if(err != 0) {
        return erlfdb_erlang_error(env, err);
    }

    if(!present) {
        return ATOM_not_found;
    }

    buf = enif_make_new_binary(env, len, &ret);
    memcpy(buf, val, len);

    return ret;
}


static inline ERL_NIF_TERM
erlfdb_future_get_string_array(ErlNifEnv* env, ErlFDBFuture* f)
{
    const char** strings;
    int count;
    unsigned char* buf;
    ERL_NIF_TERM bin;
    ERL_NIF_TERM ret;
    fdb_error_t err;
    int i;

    err = fdb_future_get_string_array(f->future, &strings, &count);
    if(err != 0) {
        return erlfdb_erlang_error(env, err);
    }

    ret = enif_make_list(env, 0);

    for(i = count; i > 0; i--) {
        buf = enif_make_new_binary(env, strlen(strings[i]), &bin);
        memcpy(buf, strings[i], strlen(strings[i]));
        ret = enif_make_list_cell(env, bin, ret);
    }

    return ret;
}


static inline ERL_NIF_TERM
erlfdb_future_get_keyvalue_array(ErlNifEnv* env, ErlFDBFuture* f)
{
    FDBKeyValue const* kvs;
    fdb_bool_t more;
    int count;
    unsigned char* buf;
    ERL_NIF_TERM key;
    ERL_NIF_TERM val;
    ERL_NIF_TERM ret;
    fdb_error_t err;
    int i;

    err = fdb_future_get_keyvalue_array(f->future, &kvs, &count, &more);
    if(err != 0) {
        return erlfdb_erlang_error(env, err);
    }

    ret = enif_make_list(env, 0);

    for(i = count - 1; i >= 0; i--) {
        buf = enif_make_new_binary(env, kvs[i].key_length, &key);
        memcpy(buf, kvs[i].key, kvs[i].key_length);
        buf = enif_make_new_binary(env, kvs[i].value_length, &val);
        memcpy(buf, kvs[i].value, kvs[i].value_length);
        ret = enif_make_list_cell(env, T2(env, key, val), ret);
    }

    if(more) {
        return T3(env, ret, enif_make_int(env, count), ATOM_true);
    } else {
        return T3(env, ret, enif_make_int(env, count), ATOM_false);
    }
}


static int
erlfdb_load(ErlNifEnv* env, void** priv, ERL_NIF_TERM num_schedulers)
{
    ErlFDBSt* st = (ErlFDBSt*) enif_alloc(sizeof(ErlFDBSt));

    erlfdb_init_atoms(env);

    if(!erlfdb_init_resources(env)) {
        return 1;
    }

    st->lock = enif_mutex_create("fdb:st_lock");
    st->cond = enif_cond_create("fdb:st_cond");

    st->lib_state = ErlFDB_LOADED;
    *priv = st;

    return 0;
}


static void
erlfdb_unload(ErlNifEnv* env, void* priv)
{
    ErlFDBSt* st = (ErlFDBSt*) priv;
    ErlFDBLibState lib_state = ErlFDB_STATE_ERROR;
    fdb_error_t err;
    void* tmp;
    fdb_error_t* net_err;
    int nif_err;

    enif_mutex_lock(st->lock);
    lib_state = st->lib_state;
    enif_mutex_unlock(st->lock);

    if(lib_state == ErlFDB_CONNECTED) {
        err = fdb_stop_network();
        assert(err == 0 && "Error disconnecting fdb client");

        nif_err = enif_thread_join(st->network_tid, &tmp);
        assert(nif_err == 0 && "Error joining network thread");

        net_err = (fdb_error_t*) tmp;
        assert(*net_err == 0 && "Error running network thread");
        enif_free(tmp);
    }

    enif_mutex_destroy(st->lock);
    enif_cond_destroy(st->cond);
    enif_free(priv);

    return;
}


static ERL_NIF_TERM
erlfdb_can_initialize(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ERL_NIF_TERM atom;

    if(st->lib_state != ErlFDB_LOADED) {
        return enif_make_badarg(env);
    }

    if(argc != 0) {
        return enif_make_badarg(env);
    }

    if(enif_make_existing_atom(env, SENTINEL, &atom, ERL_NIF_LATIN1)) {
        return ATOM_false;
    }

    enif_make_atom(env, SENTINEL);

    return ATOM_true;
}


static ERL_NIF_TERM
erlfdb_get_max_api_version(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
    int vsn = fdb_get_max_api_version();

    if(argc != 0) {
        return enif_make_badarg(env);
    }

    return enif_make_int(env, vsn);
}


static ERL_NIF_TERM
erlfdb_select_api_version(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    int vsn;
    fdb_error_t err;

    if(st->lib_state != ErlFDB_LOADED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_int(env, argv[0], &vsn)) {
        return enif_make_badarg(env);
    }

    err = fdb_select_api_version(vsn);

    if(err != 0) {
        return erlfdb_erlang_error(env, err);
    }

    st->lib_state = ErlFDB_API_SELECTED;

    return ATOM_ok;
}


static ERL_NIF_TERM
erlfdb_network_set_option(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    FDBNetworkOption option;
    ErlNifBinary value;
    fdb_error_t err;

    if(st->lib_state != ErlFDB_API_SELECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 2) {
        return enif_make_badarg(env);
    }

    if(IS_ATOM(argv[0], local_address)) {
        option = FDB_NET_OPTION_LOCAL_ADDRESS;
    } else if(IS_ATOM(argv[0], cluster_file)) {
        option = FDB_NET_OPTION_CLUSTER_FILE;
    } else if(IS_ATOM(argv[0], trace_enable)) {
        option = FDB_NET_OPTION_TRACE_ENABLE;
    } else if(IS_ATOM(argv[0], trace_format)) {
        option = FDB_NET_OPTION_TRACE_FORMAT;
    } else if(IS_ATOM(argv[0], trace_roll_size)) {
        option = FDB_NET_OPTION_TRACE_ROLL_SIZE;
    } else if(IS_ATOM(argv[0], trace_max_logs_size)) {
        option = FDB_NET_OPTION_TRACE_MAX_LOGS_SIZE;
    } else if(IS_ATOM(argv[0], trace_log_group)) {
        option = FDB_NET_OPTION_TRACE_LOG_GROUP;
    } else if(IS_ATOM(argv[0], knob)) {
        option = FDB_NET_OPTION_KNOB;
    } else if(IS_ATOM(argv[0], tls_plugin)) {
        option = FDB_NET_OPTION_TLS_PLUGIN;
    } else if(IS_ATOM(argv[0], tls_cert_bytes)) {
        option = FDB_NET_OPTION_TLS_CERT_BYTES;
    } else if(IS_ATOM(argv[0], tls_cert_path)) {
        option = FDB_NET_OPTION_TLS_CERT_PATH;
    } else if(IS_ATOM(argv[0], tls_key_bytes)) {
        option = FDB_NET_OPTION_TLS_KEY_BYTES;
    } else if(IS_ATOM(argv[0], tls_key_path)) {
        option = FDB_NET_OPTION_TLS_KEY_PATH;
    } else if(IS_ATOM(argv[0], tls_verify_peers)) {
        option = FDB_NET_OPTION_TLS_VERIFY_PEERS;
    } else if(IS_ATOM(argv[0], client_buggify_enable)) {
        option = FDB_NET_OPTION_CLIENT_BUGGIFY_ENABLE;
    } else if(IS_ATOM(argv[0], client_buggify_disable)) {
        option = FDB_NET_OPTION_CLIENT_BUGGIFY_DISABLE;
    } else if(IS_ATOM(argv[0], client_buggify_section_activated_probability)) {
        option = FDB_NET_OPTION_CLIENT_BUGGIFY_SECTION_ACTIVATED_PROBABILITY;
    } else if(IS_ATOM(argv[0], client_buggify_section_fired_probability)) {
        option = FDB_NET_OPTION_CLIENT_BUGGIFY_SECTION_FIRED_PROBABILITY;
    } else if(IS_ATOM(argv[0], tls_ca_bytes)) {
        option = FDB_NET_OPTION_TLS_CA_BYTES;
    } else if(IS_ATOM(argv[0], tls_password)) {
        option = FDB_NET_OPTION_TLS_PASSWORD;
    } else if(IS_ATOM(argv[0], disable_multi_version_client_api)) {
        option = FDB_NET_OPTION_DISABLE_MULTI_VERSION_CLIENT_API;
    } else if(IS_ATOM(argv[0], callbacks_on_external_threads)) {
        option = FDB_NET_OPTION_CALLBACKS_ON_EXTERNAL_THREADS;
    } else if(IS_ATOM(argv[0], external_client_library)) {
        option = FDB_NET_OPTION_EXTERNAL_CLIENT_LIBRARY;
    } else if(IS_ATOM(argv[0], external_client_directory)) {
        option = FDB_NET_OPTION_EXTERNAL_CLIENT_DIRECTORY;
    } else if(IS_ATOM(argv[0], disable_local_client)) {
        option = FDB_NET_OPTION_DISABLE_LOCAL_CLIENT;
    } else if(IS_ATOM(argv[0], disable_client_statistics_logging)) {
        option = FDB_NET_OPTION_DISABLE_CLIENT_STATISTICS_LOGGING;
    } else if(IS_ATOM(argv[0], enable_slow_task_profiling)) {
        option = FDB_NET_OPTION_ENABLE_SLOW_TASK_PROFILING;
    } else {
        return enif_make_badarg(env);
    }

    if(!enif_inspect_binary(env, argv[1], &value)) {
        return enif_make_badarg(env);
    }

    err = fdb_network_set_option(option, (uint8_t*) value.data, value.size);
    if(err != 0) {
        return erlfdb_erlang_error(env, err);
    }

    return ATOM_ok;
}


static ERL_NIF_TERM
erlfdb_setup_network(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    fdb_error_t err;

    if(st->lib_state != ErlFDB_API_SELECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 0) {
        return enif_make_badarg(env);
    }

    err = fdb_setup_network();
    if(err != 0) {
        return erlfdb_erlang_error(env, err);
    }

    if(enif_thread_create(
            "fdb:network_thread",
            &(st->network_tid),
            erlfdb_network_thread,
            (void*) st,
            NULL
        ) != 0) {
        return enif_make_badarg(env);
    }

    enif_mutex_lock(st->lock);

    while(st->lib_state != ErlFDB_NETWORK_STARTING) {
        enif_cond_wait(st->cond, st->lock);
    }

    enif_mutex_unlock(st->lock);

    st->lib_state = ErlFDB_CONNECTED;

    return ATOM_ok;
}


static ERL_NIF_TERM
erlfdb_future_cancel(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBFuture* future;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBFutureRes, &res)) {
        return enif_make_badarg(env);
    }
    future = (ErlFDBFuture*) res;

    enif_mutex_lock(future->lock);

    future->cancelled = true;
    fdb_future_cancel(future->future);

    enif_mutex_unlock(future->lock);

    return ATOM_ok;
}


static ERL_NIF_TERM
erlfdb_future_silence(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBFuture* future;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBFutureRes, &res)) {
        return enif_make_badarg(env);
    }
    future = (ErlFDBFuture*) res;

    enif_mutex_lock(future->lock);

    future->cancelled = true;

    enif_mutex_unlock(future->lock);

    return ATOM_ok;
}

static ERL_NIF_TERM
erlfdb_future_is_ready(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBFuture* future;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBFutureRes, &res)) {
        return enif_make_badarg(env);
    }
    future = (ErlFDBFuture*) res;

    if(fdb_future_is_ready(future->future)) {
        return ATOM_true;
    } else {
        return ATOM_false;
    }
}


static ERL_NIF_TERM
erlfdb_future_get_error(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBFuture* future;
    fdb_error_t err;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBFutureRes, &res)) {
        return enif_make_badarg(env);
    }
    future = (ErlFDBFuture*) res;

    err = fdb_future_get_error(future->future);

    return T2(env, ATOM_erlfdb_error, enif_make_int(env, err));
}


static ERL_NIF_TERM
erlfdb_future_get(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBFuture* f;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBFutureRes, &res)) {
        return enif_make_badarg(env);
    }
    f = (ErlFDBFuture*) res;

    if(f->ftype == ErlFDB_FT_VOID) {
        return erlfdb_future_get_void(env, f);
    } else if(f->ftype == ErlFDB_FT_INT64) {
        return erlfdb_future_get_int64(env, f);
    } else if(f->ftype == ErlFDB_FT_KEY) {
        return erlfdb_future_get_key(env, f);
    } else if(f->ftype == ErlFDB_FT_VALUE) {
        return erlfdb_future_get_value(env, f);
    } else if(f->ftype == ErlFDB_FT_STRING_ARRAY) {
        return erlfdb_future_get_string_array(env, f);
    } else if(f->ftype == ErlFDB_FT_KEYVALUE_ARRAY) {
        return erlfdb_future_get_keyvalue_array(env, f);
    }

    return enif_raise_exception(env, ATOM_invalid_future_type);
}


static ERL_NIF_TERM
erlfdb_create_database(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlNifBinary bin;
    FDBDatabase* database;
    fdb_error_t err;
    ErlFDBDatabase* d;
    ERL_NIF_TERM ret;


    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_inspect_binary(env, argv[0], &bin)) {
        return enif_make_badarg(env);
    }

    if(bin.size < 1 || bin.data[bin.size - 1] != 0) {
        return enif_make_badarg(env);
    }

    err = fdb_create_database((const char*) bin.data, &database);
    if(err != 0) {
        return erlfdb_erlang_error(env, err);
    }

    d = enif_alloc_resource(ErlFDBDatabaseRes, sizeof(ErlFDBDatabase));
    d->database = database;

    ret = enif_make_resource(env, d);
    enif_release_resource(d);

    return T2(env, ATOM_erlfdb_database, ret);
}


static ERL_NIF_TERM
erlfdb_database_set_option(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBDatabase* d;
    FDBDatabaseOption option;
    ErlNifBinary value;
    fdb_error_t err;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 3) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBDatabaseRes, &res)) {
        return enif_make_badarg(env);
    }
    d = (ErlFDBDatabase*) res;

    if(IS_ATOM(argv[1], location_cache_size)) {
        option = FDB_DB_OPTION_LOCATION_CACHE_SIZE;
    } else if(IS_ATOM(argv[1], max_watches)) {
        option = FDB_DB_OPTION_MAX_WATCHES;
    } else if(IS_ATOM(argv[1], machine_id)) {
        option = FDB_DB_OPTION_MACHINE_ID;
    } else if(IS_ATOM(argv[1], datacenter_id)) {
        option = FDB_DB_OPTION_DATACENTER_ID;
    } else if(IS_ATOM(argv[1], read_your_writes_enable)) {
        option = FDB_DB_OPTION_SNAPSHOT_RYW_ENABLE;
    } else if(IS_ATOM(argv[1], read_your_writes_disable)) {
        option = FDB_DB_OPTION_SNAPSHOT_RYW_DISABLE;
    } else if(IS_ATOM(argv[1], transaction_logging_max_field_length)) {
        option = FDB_DB_OPTION_TRANSACTION_LOGGING_MAX_FIELD_LENGTH;
    } else if(IS_ATOM(argv[1], timeout)) {
        option = FDB_DB_OPTION_TRANSACTION_TIMEOUT;
    } else if(IS_ATOM(argv[1], retry_limit)) {
        option = FDB_DB_OPTION_TRANSACTION_RETRY_LIMIT;
    } else if(IS_ATOM(argv[1], max_retry_delay)) {
        option = FDB_DB_OPTION_TRANSACTION_MAX_RETRY_DELAY;
    } else if(IS_ATOM(argv[1], size_limit)) {
        option = FDB_DB_OPTION_TRANSACTION_SIZE_LIMIT;
    } else if(IS_ATOM(argv[1], causal_read_risky)) {
        option = FDB_DB_OPTION_TRANSACTION_CAUSAL_READ_RISKY;
    } else if(IS_ATOM(argv[1], include_port_in_address)) {
        option = FDB_DB_OPTION_TRANSACTION_INCLUDE_PORT_IN_ADDRESS;
        return enif_make_badarg(env);
    }

    if(!enif_inspect_binary(env, argv[2], &value)) {
        return enif_make_badarg(env);
    }

    err = fdb_database_set_option(
            d->database,
            option,
            (uint8_t*) value.data,
            value.size
        );

    if(err != 0) {
        return erlfdb_erlang_error(env, err);
    }

    return ATOM_ok;
}


static ERL_NIF_TERM
erlfdb_database_create_transaction(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBDatabase* d;
    ErlFDBTransaction* t;
    FDBTransaction* transaction;
    ErlNifPid pid;
    ERL_NIF_TERM ret;
    void* res;
    fdb_error_t err;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBDatabaseRes, &res)) {
        return enif_make_badarg(env);
    }
    d = (ErlFDBDatabase*) res;

    err = fdb_database_create_transaction(d->database, &transaction);
    if(err != 0) {
        return erlfdb_erlang_error(env, err);
    }

    t = enif_alloc_resource(ErlFDBTransactionRes, sizeof(ErlFDBTransaction));
    t->transaction = transaction;

    enif_self(env, &pid);
    t->owner = enif_make_pid(env, &pid);

    t->txid = 0;
    t->read_only = true;
    t->writes_allowed = true;
    t->has_watches = false;

    ret = enif_make_resource(env, t);
    enif_release_resource(t);
    return T2(env, ATOM_erlfdb_transaction, ret);
}


static ERL_NIF_TERM
erlfdb_transaction_set_option(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    FDBTransactionOption option;
    ErlNifBinary value;
    fdb_error_t err;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 3) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    if(IS_ATOM(argv[1], allow_writes)) {
        t->writes_allowed = true;
        return ATOM_ok;
    } else if (IS_ATOM(argv[1], disallow_writes)) {
        if(!t->read_only) {
            return enif_make_badarg(env);
        }
        t->writes_allowed = false;
        return ATOM_ok;
    }


    if(IS_ATOM(argv[1], causal_write_risky)) {
        option = FDB_TR_OPTION_CAUSAL_WRITE_RISKY;
    } else if(IS_ATOM(argv[1], causal_read_risky)) {
        option = FDB_TR_OPTION_CAUSAL_READ_RISKY;
    } else if(IS_ATOM(argv[1], causal_read_disable)) {
        option = FDB_TR_OPTION_CAUSAL_READ_DISABLE;
    } else if(IS_ATOM(argv[1], include_port_in_address)) {
        option = FDB_TR_OPTION_INCLUDE_PORT_IN_ADDRESS;
    } else if(IS_ATOM(argv[1], next_write_no_write_conflict_range)) {
        option = FDB_TR_OPTION_NEXT_WRITE_NO_WRITE_CONFLICT_RANGE;
    } else if(IS_ATOM(argv[1], read_your_writes_disable)) {
        option = FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE;
    } else if(IS_ATOM(argv[1], read_ahead_disable)) {
        option = FDB_TR_OPTION_READ_AHEAD_DISABLE;
    } else if(IS_ATOM(argv[1], durability_datacenter)) {
        option = FDB_TR_OPTION_DURABILITY_DATACENTER;
    } else if(IS_ATOM(argv[1], durability_risky)) {
        option = FDB_TR_OPTION_DURABILITY_RISKY;
    } else if(IS_ATOM(argv[1], durability_dev_null_is_web_scale)) {
        option = FDB_TR_OPTION_DURABILITY_DEV_NULL_IS_WEB_SCALE;
    } else if(IS_ATOM(argv[1], priority_system_immediate)) {
        option = FDB_TR_OPTION_PRIORITY_SYSTEM_IMMEDIATE;
    } else if(IS_ATOM(argv[1], priority_batch)) {
        option = FDB_TR_OPTION_PRIORITY_BATCH;
    } else if(IS_ATOM(argv[1], initialize_new_database)) {
        option = FDB_TR_OPTION_INITIALIZE_NEW_DATABASE;
    } else if(IS_ATOM(argv[1], access_system_keys)) {
        option = FDB_TR_OPTION_ACCESS_SYSTEM_KEYS;
    } else if(IS_ATOM(argv[1], read_system_keys)) {
        option = FDB_TR_OPTION_READ_SYSTEM_KEYS;
    } else if(IS_ATOM(argv[1], debug_retry_logging)) {
        option = FDB_TR_OPTION_DEBUG_RETRY_LOGGING;
    } else if(IS_ATOM(argv[1], transaction_logging_enable)) {
        option = FDB_TR_OPTION_TRANSACTION_LOGGING_ENABLE;
    } else if(IS_ATOM(argv[1], debug_transaction_identifier)) {
        option = FDB_TR_OPTION_DEBUG_TRANSACTION_IDENTIFIER;
    } else if(IS_ATOM(argv[1], log_transaction)) {
        option = FDB_TR_OPTION_LOG_TRANSACTION;
    } else if(IS_ATOM(argv[1], transaction_logging_max_field_length)) {
        option = FDB_TR_OPTION_TRANSACTION_LOGGING_MAX_FIELD_LENGTH;
    } else if(IS_ATOM(argv[1], timeout)) {
        option = FDB_TR_OPTION_TIMEOUT;
    } else if(IS_ATOM(argv[1], retry_limit)) {
        option = FDB_TR_OPTION_RETRY_LIMIT;
    } else if(IS_ATOM(argv[1], max_retry_delay)) {
        option = FDB_TR_OPTION_MAX_RETRY_DELAY;
    } else if(IS_ATOM(argv[1], snapshot_ryw_enable)) {
        option = FDB_TR_OPTION_SNAPSHOT_RYW_ENABLE;
    } else if(IS_ATOM(argv[1], snapshot_ryw_disable)) {
        option = FDB_TR_OPTION_SNAPSHOT_RYW_DISABLE;
    } else if(IS_ATOM(argv[1], lock_aware)) {
        option = FDB_TR_OPTION_LOCK_AWARE;
    } else if(IS_ATOM(argv[1], used_during_commit_protection_disable)) {
        option = FDB_TR_OPTION_USED_DURING_COMMIT_PROTECTION_DISABLE;
    } else if(IS_ATOM(argv[1], read_lock_aware)) {
        option = FDB_TR_OPTION_READ_LOCK_AWARE;
    } else if(IS_ATOM(argv[1], size_limit)) {
        option = FDB_TR_OPTION_SIZE_LIMIT;
    } else if(IS_ATOM(argv[1], use_provisional_proxies)) {
        option = FDB_TR_OPTION_USE_PROVISIONAL_PROXIES;
#if FDB_API_VERSION > 620
    } else if(IS_ATOM(argv[1], report_conflicting_keys)) {
        option = FDB_TR_OPTION_REPORT_CONFLICTING_KEYS;
#endif
    } else {
        return enif_make_badarg(env);
    }

    if(!enif_inspect_binary(env, argv[2], &value)) {
        return enif_make_badarg(env);
    }

    err = fdb_transaction_set_option(
            t->transaction,
            option,
            (uint8_t*) value.data,
            value.size
        );

    if(err != 0) {
        return erlfdb_erlang_error(env, err);
    }

    return ATOM_ok;
}


static ERL_NIF_TERM
erlfdb_transaction_set_read_version(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    ErlNifSInt64 erl_vsn;
    int64_t fdb_vsn;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 2) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    if(!enif_get_int64(env, argv[1], &erl_vsn)) {
        return enif_make_badarg(env);
    }

    fdb_vsn = erl_vsn;

    fdb_transaction_set_read_version(t->transaction, fdb_vsn);

    return ATOM_ok;
}


static ERL_NIF_TERM
erlfdb_transaction_get_read_version(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    FDBFuture* future;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    future = fdb_transaction_get_read_version(t->transaction);

    return erlfdb_create_future(env, future, ErlFDB_FT_INT64);
}


static ERL_NIF_TERM
erlfdb_transaction_get(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    ErlNifBinary key;
    fdb_bool_t snapshot;
    FDBFuture* future;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 3) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    if(!enif_inspect_binary(env, argv[1], &key)) {
        return enif_make_badarg(env);
    }

    if(!erlfdb_get_boolean(argv[2], &snapshot)) {
        return enif_make_badarg(env);
    }

    future = fdb_transaction_get(
            t->transaction,
            (uint8_t*) key.data,
            key.size,
            snapshot
        );

    return erlfdb_create_future(env, future, ErlFDB_FT_VALUE);
}


static ERL_NIF_TERM
erlfdb_transaction_get_key(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    ErlNifBinary key;
    fdb_bool_t or_equal;
    int offset;
    fdb_bool_t snapshot;
    FDBFuture* future;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 3) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    if(!erlfdb_get_key_selector(env, argv[1], &key, &or_equal, &offset)) {
        return enif_make_badarg(env);
    }

    if(!erlfdb_get_boolean(argv[2], &snapshot)) {
        return enif_make_badarg(env);
    }

    future = fdb_transaction_get_key(
            t->transaction,
            (uint8_t*) key.data,
            key.size,
            or_equal,
            offset,
            snapshot
        );

    return erlfdb_create_future(env, future, ErlFDB_FT_KEY);
}


static ERL_NIF_TERM
erlfdb_transaction_get_addresses_for_key(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    ErlNifBinary key;
    FDBFuture* future;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 2) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    if(!enif_inspect_binary(env, argv[1], &key)) {
        return enif_make_badarg(env);
    }

    future = fdb_transaction_get_addresses_for_key(
            t->transaction,
            (uint8_t*) key.data,
            key.size
        );

    return erlfdb_create_future(env, future, ErlFDB_FT_STRING_ARRAY);
}

static ERL_NIF_TERM
erlfdb_transaction_get_range(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;

    ErlNifBinary skey;
    fdb_bool_t sor_equal;
    int soffset;

    ErlNifBinary ekey;
    fdb_bool_t eor_equal;
    int eoffset;

    int limit;
    int target_bytes;
    FDBStreamingMode mode;
    int iteration;
    fdb_bool_t snapshot;
    int reverse;

    FDBFuture* future;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 9) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    if(!erlfdb_get_key_selector(env, argv[1], &skey, &sor_equal, &soffset)) {
        return enif_make_badarg(env);
    }

    if(!erlfdb_get_key_selector(env, argv[2], &ekey, &eor_equal, &eoffset)) {
        return enif_make_badarg(env);
    }

    if(!enif_get_int(env, argv[3], &limit)) {
        return enif_make_badarg(env);
    }

    if(!enif_get_int(env, argv[4], &target_bytes)) {
        return enif_make_badarg(env);
    }

    if(IS_ATOM(argv[5], want_all)) {
        mode = FDB_STREAMING_MODE_WANT_ALL;
    } else if(IS_ATOM(argv[5], iterator)) {
        mode = FDB_STREAMING_MODE_ITERATOR;
    } else if(IS_ATOM(argv[5], exact)) {
        mode = FDB_STREAMING_MODE_EXACT;
    } else if(IS_ATOM(argv[5], small)) {
        mode = FDB_STREAMING_MODE_SMALL;
    } else if(IS_ATOM(argv[5], medium)) {
        mode = FDB_STREAMING_MODE_MEDIUM;
    } else if(IS_ATOM(argv[5], large)) {
        mode = FDB_STREAMING_MODE_LARGE;
    } else if(IS_ATOM(argv[5], serial)) {
        mode = FDB_STREAMING_MODE_SERIAL;
    } else if(!enif_get_int(env, argv[5], &mode)) {
        return enif_make_badarg(env);
    }

    if(!enif_get_int(env, argv[6], &iteration)) {
        return enif_make_badarg(env);
    }

    if(!erlfdb_get_boolean(argv[7], &snapshot)) {
        return enif_make_badarg(env);
    }

    if(!enif_get_int(env, argv[8], &reverse)) {
        return enif_make_badarg(env);
    }

    future = fdb_transaction_get_range(
            t->transaction,
            (uint8_t*) skey.data,
            skey.size,
            sor_equal,
            soffset,
            (uint8_t*) ekey.data,
            ekey.size,
            eor_equal,
            eoffset,
            limit,
            target_bytes,
            mode,
            iteration,
            snapshot,
            reverse
        );

    return erlfdb_create_future(env, future, ErlFDB_FT_KEYVALUE_ARRAY);
}


static ERL_NIF_TERM
erlfdb_transaction_set(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    ErlNifBinary key;
    ErlNifBinary val;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 3) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    if(!t->writes_allowed) {
        return enif_raise_exception(env, ATOM_writes_not_allowed);
    }

    if(!enif_inspect_binary(env, argv[1], &key)) {
        return enif_make_badarg(env);
    }

    if(!enif_inspect_binary(env, argv[2], &val)) {
        return enif_make_badarg(env);
    }

    fdb_transaction_set(
            t->transaction,
            (uint8_t*) key.data,
            key.size,
            (uint8_t*) val.data,
            val.size
        );

    t->read_only = false;

    return ATOM_ok;
}


static ERL_NIF_TERM
erlfdb_transaction_clear(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    ErlNifBinary key;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 2) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    if(!t->writes_allowed) {
        return enif_raise_exception(env, ATOM_writes_not_allowed);
    }

    if(!enif_inspect_binary(env, argv[1], &key)) {
        return enif_make_badarg(env);
    }

    fdb_transaction_clear(t->transaction, (uint8_t*) key.data, key.size);

    t->read_only = false;

    return ATOM_ok;
}


static ERL_NIF_TERM
erlfdb_transaction_clear_range(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    ErlNifBinary skey;
    ErlNifBinary ekey;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 3) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    if(!t->writes_allowed) {
        return enif_raise_exception(env, ATOM_writes_not_allowed);
    }

    if(!enif_inspect_binary(env, argv[1], &skey)) {
        return enif_make_badarg(env);
    }

    if(!enif_inspect_binary(env, argv[2], &ekey)) {
        return enif_make_badarg(env);
    }

    fdb_transaction_clear_range(
            t->transaction,
            (uint8_t*) skey.data,
            skey.size,
            (uint8_t*) ekey.data,
            ekey.size
        );

    t->read_only = false;

    return ATOM_ok;
}


static ERL_NIF_TERM
erlfdb_transaction_atomic_op(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    ErlNifBinary key;
    FDBMutationType mtype;
    ErlNifBinary param;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 4) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    if(!t->writes_allowed) {
        return enif_raise_exception(env, ATOM_writes_not_allowed);
    }

    if(!enif_inspect_binary(env, argv[1], &key)) {
        return enif_make_badarg(env);
    }

    if(!enif_inspect_binary(env, argv[2], &param)) {
        return enif_make_badarg(env);
    }

    if(IS_ATOM(argv[3], add)) {
        mtype = FDB_MUTATION_TYPE_ADD;
    } else if(IS_ATOM(argv[3], bit_and)) {
        mtype = FDB_MUTATION_TYPE_BIT_AND;
    } else if(IS_ATOM(argv[3], bit_or)) {
        mtype = FDB_MUTATION_TYPE_BIT_OR;
    } else if(IS_ATOM(argv[3], bit_xor)) {
        mtype = FDB_MUTATION_TYPE_BIT_XOR;
    } else if(IS_ATOM(argv[3], append_if_fits)) {
        mtype = FDB_MUTATION_TYPE_APPEND_IF_FITS;
    } else if(IS_ATOM(argv[3], max)) {
        mtype = FDB_MUTATION_TYPE_MAX;
    } else if(IS_ATOM(argv[3], min)) {
        mtype = FDB_MUTATION_TYPE_MIN;
    } else if(IS_ATOM(argv[3], byte_min)) {
        mtype = FDB_MUTATION_TYPE_BYTE_MIN;
    } else if(IS_ATOM(argv[3], byte_max)) {
        mtype = FDB_MUTATION_TYPE_BYTE_MAX;
    } else if(IS_ATOM(argv[3], set_versionstamped_key)) {
        mtype = FDB_MUTATION_TYPE_SET_VERSIONSTAMPED_KEY;
    } else if(IS_ATOM(argv[3], set_versionstamped_value)) {
        mtype = FDB_MUTATION_TYPE_SET_VERSIONSTAMPED_VALUE;
    } else {
        return enif_make_badarg(env);
    }

    fdb_transaction_atomic_op(
            t->transaction,
            (uint8_t*) key.data,
            key.size,
            (uint8_t*) param.data,
            param.size,
            mtype
        );

    t->read_only = false;

    return ATOM_ok;
}

static ERL_NIF_TERM
erlfdb_transaction_commit(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    FDBFuture* future;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    future = fdb_transaction_commit(t->transaction);

    return erlfdb_create_future(env, future, ErlFDB_FT_VOID);
}


static ERL_NIF_TERM
erlfdb_transaction_get_committed_version(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    int64_t fdb_vsn;
    ErlNifSInt64 erl_vsn;
    fdb_error_t err;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    err = fdb_transaction_get_committed_version(t->transaction, &fdb_vsn);
    if(err != 0) {
        return erlfdb_erlang_error(env, err);
    }

    erl_vsn = fdb_vsn;
    return enif_make_int64(env, erl_vsn);
}


static ERL_NIF_TERM
erlfdb_transaction_get_versionstamp(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    FDBFuture* future;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    future = fdb_transaction_get_versionstamp(t->transaction);

    return erlfdb_create_future(env, future, ErlFDB_FT_KEY);
}


static ERL_NIF_TERM
erlfdb_transaction_watch(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    ErlNifBinary key;
    FDBFuture* future;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 2) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    // In order for the watches to fire the transaction must commit, even if it
    // is a read-only transaction. So if writes are explicitly disallowed, also
    // do not allow setting any watches.
    if(!t->writes_allowed) {
        return enif_raise_exception(env, ATOM_writes_not_allowed);
    }

    if(!enif_inspect_binary(env, argv[1], &key)) {
        return enif_make_badarg(env);
    }

    future = fdb_transaction_watch(
            t->transaction,
            (uint8_t*) key.data,
            key.size
        );

    t->has_watches = true;
    return erlfdb_create_future(env, future, ErlFDB_FT_VOID);
}


static ERL_NIF_TERM
erlfdb_transaction_on_error(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    int erl_err;
    fdb_error_t fdb_err;
    FDBFuture* future;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 2) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    if(!enif_get_int(env, argv[1], &erl_err)) {
        return enif_make_badarg(env);
    }

    fdb_err = erl_err;

    future = fdb_transaction_on_error(t->transaction, fdb_err);

    return erlfdb_create_future(env, future, ErlFDB_FT_VOID);
}


static ERL_NIF_TERM
erlfdb_transaction_reset(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    fdb_transaction_reset(t->transaction);

    t->txid = 0;
    t->read_only = true;
    t->has_watches = false;

    return ATOM_ok;
}


static ERL_NIF_TERM
erlfdb_transaction_cancel(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    fdb_transaction_cancel(t->transaction);

    return ATOM_ok;
}


static ERL_NIF_TERM
erlfdb_transaction_add_conflict_range(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    ErlNifBinary skey;
    ErlNifBinary ekey;
    FDBConflictRangeType rtype;
    fdb_error_t err;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 4) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    if(!enif_inspect_binary(env, argv[1], &skey)) {
        return enif_make_badarg(env);
    }

    if(!enif_inspect_binary(env, argv[2], &ekey)) {
        return enif_make_badarg(env);
    }

    if(IS_ATOM(argv[3], read)) {
        rtype = FDB_CONFLICT_RANGE_TYPE_READ;
    } else if(IS_ATOM(argv[3], write)) {
        if(!t->writes_allowed) {
            return enif_raise_exception(env, ATOM_writes_not_allowed);
        }
        rtype = FDB_CONFLICT_RANGE_TYPE_WRITE;
    } else {
        return enif_make_badarg(env);
    }

    err = fdb_transaction_add_conflict_range(
            t->transaction,
            (uint8_t*) skey.data,
            skey.size,
            (uint8_t*) ekey.data,
            ekey.size,
            rtype
        );

    if(err != 0) {
        return erlfdb_erlang_error(env, err);
    }

    if(rtype == FDB_CONFLICT_RANGE_TYPE_WRITE) {
        t->read_only = false;
    }

    return ATOM_ok;
}


static ERL_NIF_TERM
erlfdb_transaction_get_approximate_size(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    FDBFuture* future;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    future = fdb_transaction_get_approximate_size(t->transaction);

    return erlfdb_create_future(env, future, ErlFDB_FT_INT64);
}


static ERL_NIF_TERM
erlfdb_transaction_get_next_tx_id(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    if(t->txid > 65535) {
        return enif_make_badarg(env);
    }

    return enif_make_uint(env, t->txid++);
}


static ERL_NIF_TERM
erlfdb_transaction_is_read_only(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    if(t->read_only) {
        return ATOM_true;
    } else {
        return ATOM_false;
    }
}


static ERL_NIF_TERM
erlfdb_transaction_has_watches(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    if(t->has_watches) {
        return ATOM_true;
    } else {
        return ATOM_false;
    }
}


static ERL_NIF_TERM
erlfdb_transaction_get_writes_allowed(
        ErlNifEnv* env,
        int argc,
        const ERL_NIF_TERM argv[]
    )
{
    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
    ErlFDBTransaction* t;
    void* res;

    if(st->lib_state != ErlFDB_CONNECTED) {
        return enif_make_badarg(env);
    }

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
        return enif_make_badarg(env);
    }
    t = (ErlFDBTransaction*) res;

    if(!erlfdb_transaction_is_owner(env, t)) {
        return enif_make_badarg(env);
    }

    if(t->writes_allowed) {
        return ATOM_true;
    } else {
        return ATOM_false;
    }
}


static ERL_NIF_TERM
erlfdb_get_error(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
    int erl_err;
    fdb_error_t fdb_err;
    const char* str_err;
    ERL_NIF_TERM ret;
    unsigned char* buf;

    if(argc != 1) {
        return enif_make_badarg(env);
    }

    if(!enif_get_int(env, argv[0], &erl_err)) {
        return enif_make_badarg(env);
    }

    fdb_err = erl_err;
    str_err = fdb_get_error(fdb_err);

    // No clue if this happens ever
    if(!str_err) {
        return enif_make_badarg(env);
    }

    buf = enif_make_new_binary(env, strlen(str_err), &ret);
    memcpy(buf, str_err, strlen(str_err));

    return ret;
}


static ERL_NIF_TERM
erlfdb_error_predicate(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
    int erl_err;
    fdb_error_t fdb_err;
    FDBErrorPredicate pred;

    if(argc != 2) {
        return enif_make_badarg(env);
    }

    if(IS_ATOM(argv[0], retryable)) {
        pred = FDB_ERROR_PREDICATE_RETRYABLE;
    } else if(IS_ATOM(argv[0], maybe_committed)) {
        pred = FDB_ERROR_PREDICATE_MAYBE_COMMITTED;
    } else if(IS_ATOM(argv[0], retryable_not_committed)) {
        pred = FDB_ERROR_PREDICATE_RETRYABLE_NOT_COMMITTED;
    } else{
        return enif_make_badarg(env);
    }

    if(!enif_get_int(env, argv[1], &erl_err)) {
        return enif_make_badarg(env);
    }

    fdb_err = erl_err;

    if(fdb_error_predicate(pred, fdb_err)) {
        return ATOM_true;
    } else {
        return ATOM_false;
    }
}


#define NIF_FUNC(name, arity) {#name, arity, name}
static ErlNifFunc funcs[] =
{
    NIF_FUNC(erlfdb_can_initialize, 0),

    NIF_FUNC(erlfdb_get_max_api_version, 0),
    NIF_FUNC(erlfdb_select_api_version, 1),

    NIF_FUNC(erlfdb_network_set_option, 2),
    NIF_FUNC(erlfdb_setup_network, 0),

    NIF_FUNC(erlfdb_future_cancel, 1),
    NIF_FUNC(erlfdb_future_silence, 1),
    NIF_FUNC(erlfdb_future_is_ready, 1),
    NIF_FUNC(erlfdb_future_get_error, 1),
    NIF_FUNC(erlfdb_future_get, 1),

    NIF_FUNC(erlfdb_create_database, 1),
    NIF_FUNC(erlfdb_database_set_option, 3),
    NIF_FUNC(erlfdb_database_create_transaction, 1),

    NIF_FUNC(erlfdb_transaction_set_option, 3),
    NIF_FUNC(erlfdb_transaction_set_read_version, 2),
    NIF_FUNC(erlfdb_transaction_get_read_version, 1),
    NIF_FUNC(erlfdb_transaction_get, 3),
    NIF_FUNC(erlfdb_transaction_get_key, 3),
    NIF_FUNC(erlfdb_transaction_get_addresses_for_key, 2),
    NIF_FUNC(erlfdb_transaction_get_range, 9),
    NIF_FUNC(erlfdb_transaction_set, 3),
    NIF_FUNC(erlfdb_transaction_clear, 2),
    NIF_FUNC(erlfdb_transaction_clear_range, 3),
    NIF_FUNC(erlfdb_transaction_atomic_op, 4),
    NIF_FUNC(erlfdb_transaction_commit, 1),
    NIF_FUNC(erlfdb_transaction_get_committed_version, 1),
    NIF_FUNC(erlfdb_transaction_get_versionstamp, 1),
    NIF_FUNC(erlfdb_transaction_watch, 2),
    NIF_FUNC(erlfdb_transaction_on_error, 2),
    NIF_FUNC(erlfdb_transaction_reset, 1),
    NIF_FUNC(erlfdb_transaction_cancel, 1),
    NIF_FUNC(erlfdb_transaction_add_conflict_range, 4),
    NIF_FUNC(erlfdb_transaction_get_approximate_size, 1),
    NIF_FUNC(erlfdb_transaction_get_next_tx_id, 1),
    NIF_FUNC(erlfdb_transaction_is_read_only, 1),
    NIF_FUNC(erlfdb_transaction_has_watches, 1),
    NIF_FUNC(erlfdb_transaction_get_writes_allowed, 1),

    NIF_FUNC(erlfdb_get_error, 1),
    NIF_FUNC(erlfdb_error_predicate, 2)
};
#undef NIF_FUNC


ERL_NIF_INIT(erlfdb_nif, funcs, &erlfdb_load, NULL, NULL, &erlfdb_unload);
