/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/**
 * cli.c is a example/sample C client shell for ZooKeeper. It contains
 * basic shell functionality which exercises some of the features of
 * the ZooKeeper C client API. It is not a full fledged client and is
 * not meant for production usage - see the Java client shell for a
 * fully featured shell.
 */

#include <zookeeper.h>
#include <proto.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>

#ifndef WIN32
#include <sys/time.h>
#include <unistd.h>
#include <sys/select.h>
#include <getopt.h>
#else
#include "winport.h"
//#include <io.h> <-- can't include, conflicting definitions of close()
int read(int _FileHandle, void * _DstBuf, unsigned int _MaxCharCount);
int write(int _Filehandle, const void * _Buf, unsigned int _MaxCharCount);
#define ctime_r(tctime, buffer) ctime_s (buffer, 40, tctime)
#include "win_getopt.h" // VisualStudio doesn't contain 'getopt'
#endif

#include <time.h>
#include <errno.h>
#include <assert.h>

#ifdef YCA
#include <yca/yca.h>
#endif

#define _LL_CAST_ (long long)

static zhandle_t *zh;
static clientid_t myid;
static const char *clientIdFile = 0;
struct timeval startTime;
static char *cmd;
static char *cert;
static int batchMode=0;

static int to_send=0;
static int sent=0;
static int recvd=0;

static int shutdownThisThing=0;

static __attribute__ ((unused)) void
printProfileInfo(struct timeval start, struct timeval end, int thres,
                 const char* msg)
{
  int delay=(end.tv_sec*1000+end.tv_usec/1000)-
    (start.tv_sec*1000+start.tv_usec/1000);
  if(delay>thres)
    fprintf(stderr,"%s: execution time=%dms\n",msg,delay);
}

static const char* state2String(int state){
  if (state == 0)
    return "CLOSED_STATE";
  if (state == ZOO_CONNECTING_STATE)
    return "CONNECTING_STATE";
  if (state == ZOO_ASSOCIATING_STATE)
    return "ASSOCIATING_STATE";
  if (state == ZOO_CONNECTED_STATE)
    return "CONNECTED_STATE";
  if (state == ZOO_READONLY_STATE)
    return "READONLY_STATE";
  if (state == ZOO_EXPIRED_SESSION_STATE)
    return "EXPIRED_SESSION_STATE";
  if (state == ZOO_AUTH_FAILED_STATE)
    return "AUTH_FAILED_STATE";

  return "INVALID_STATE";
}

static const char* type2String(int state){
  if (state == ZOO_CREATED_EVENT)
    return "CREATED_EVENT";
  if (state == ZOO_DELETED_EVENT)
    return "DELETED_EVENT";
  if (state == ZOO_CHANGED_EVENT)
    return "CHANGED_EVENT";
  if (state == ZOO_CHILD_EVENT)
    return "CHILD_EVENT";
  if (state == ZOO_SESSION_EVENT)
    return "SESSION_EVENT";
  if (state == ZOO_NOTWATCHING_EVENT)
    return "NOTWATCHING_EVENT";

  return "UNKNOWN_EVENT_TYPE";
}

void watcher(zhandle_t *zzh, int type, int state, const char *path,
             void* context)
{
    /* Be careful using zh here rather than zzh - as this may be mt code
     * the client lib may call the watcher before zookeeper_init returns */

    fprintf(stderr, "Watcher %s state = %s", type2String(type), state2String(state));
    if (path && strlen(path) > 0) {
      fprintf(stderr, " for path %s", path);
    }
    fprintf(stderr, "\n");

    if (type == ZOO_SESSION_EVENT) {
        if (state == ZOO_CONNECTED_STATE) {
            const clientid_t *id = zoo_client_id(zzh);
            if (myid.client_id == 0 || myid.client_id != id->client_id) {
                myid = *id;
                fprintf(stderr, "Got a new session id: 0x%llx\n",
                        _LL_CAST_ myid.client_id);
                if (clientIdFile) {
                    FILE *fh = fopen(clientIdFile, "w");
                    if (!fh) {
                        perror(clientIdFile);
                    } else {
                        int rc = fwrite(&myid, sizeof(myid), 1, fh);
                        if (rc != sizeof(myid)) {
                            perror("writing client id");
                        }
                        fclose(fh);
                    }
                }
            }
        } else if (state == ZOO_AUTH_FAILED_STATE) {
            fprintf(stderr, "Authentication failure. Shutting down...\n");
            zookeeper_close(zzh);
            shutdownThisThing=1;
            zh=0;
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {
            fprintf(stderr, "Session expired. Shutting down...\n");
            zookeeper_close(zzh);
            shutdownThisThing=1;
            zh=0;
        }
    }
}

void dumpStat(const struct Stat *stat) {
    char tctimes[40];
    char tmtimes[40];
    time_t tctime;
    time_t tmtime;

    if (!stat) {
        fprintf(stderr,"null\n");
        return;
    }
    tctime = stat->ctime/1000;
    tmtime = stat->mtime/1000;

    ctime_r(&tmtime, tmtimes);
    ctime_r(&tctime, tctimes);

    fprintf(stderr, "\tctime = %s\tczxid=%llx\n"
    "\tmtime=%s\tmzxid=%llx\n"
    "\tversion=%x\taversion=%x\n"
    "\tephemeralOwner = %llx\n",
     tctimes, _LL_CAST_ stat->czxid, tmtimes,
    _LL_CAST_ stat->mzxid,
    (unsigned int)stat->version, (unsigned int)stat->aversion,
    _LL_CAST_ stat->ephemeralOwner);
}

void my_string_completion(int rc, const char *name, const void *data) {
    fprintf(stderr, "[%s]: rc = %d\n", (char*)(data==0?"null":data), rc);
    if (!rc) {
        fprintf(stderr, "\tname = %s\n", name);
    }
    if(batchMode)
      shutdownThisThing=1;
}

void my_string_completion_free_data(int rc, const char *name, const void *data) {
    my_string_completion(rc, name, data);
    free((void*)data);
}

void my_string_stat_completion(int rc, const char *name, const struct Stat *stat,
        const void *data)  {
    my_string_completion(rc, name, data);
    dumpStat(stat);
}

void my_string_stat_completion_free_data(int rc, const char *name,
        const struct Stat *stat, const void *data)  {
    my_string_stat_completion(rc, name, stat, data);
    free((void*)data);
}

void my_data_completion(int rc, const char *value, int value_len,
        const struct Stat *stat, const void *data) {
    struct timeval tv;
    int sec;
    int usec;
    gettimeofday(&tv, 0);
    sec = tv.tv_sec - startTime.tv_sec;
    usec = tv.tv_usec - startTime.tv_usec;
    fprintf(stderr, "time = %d msec\n", sec*1000 + usec/1000);
    fprintf(stderr, "%s: rc = %d\n", (char*)data, rc);
    if (value) {
        fprintf(stderr, " value_len = %d\n", value_len);
        assert(write(2, value, value_len) == value_len);
    }
    fprintf(stderr, "\nStat:\n");
    dumpStat(stat);
    free((void*)data);
    if(batchMode)
      shutdownThisThing=1;
}

void my_silent_data_completion(int rc, const char *value, int value_len,
        const struct Stat *stat, const void *data) {
    recvd++;
    fprintf(stderr, "Data completion %s rc = %d\n",(char*)data,rc);
    free((void*)data);
    if (recvd==to_send) {
        fprintf(stderr,"Recvd %d responses for %d requests sent\n",recvd,to_send);
        if(batchMode)
          shutdownThisThing=1;
    }
}

void my_strings_completion(int rc, const struct String_vector *strings,
        const void *data) {
    struct timeval tv;
    int sec;
    int usec;
    int i;

    gettimeofday(&tv, 0);
    sec = tv.tv_sec - startTime.tv_sec;
    usec = tv.tv_usec - startTime.tv_usec;
    fprintf(stderr, "time = %d msec\n", sec*1000 + usec/1000);
    fprintf(stderr, "%s: rc = %d\n", (char*)data, rc);
    if (strings)
        for (i=0; i < strings->count; i++) {
            fprintf(stderr, "\t%s\n", strings->data[i]);
        }
    free((void*)data);
    gettimeofday(&tv, 0);
    sec = tv.tv_sec - startTime.tv_sec;
    usec = tv.tv_usec - startTime.tv_usec;
    fprintf(stderr, "time = %d msec\n", sec*1000 + usec/1000);
    if(batchMode)
      shutdownThisThing=1;
}

void my_strings_stat_completion(int rc, const struct String_vector *strings,
        const struct Stat *stat, const void *data) {
    my_strings_completion(rc, strings, data);
    dumpStat(stat);
    if(batchMode)
      shutdownThisThing=1;
}

void my_void_completion(int rc, const void *data) {
    fprintf(stderr, "%s: rc = %d\n", (char*)data, rc);
    free((void*)data);
    if(batchMode)
      shutdownThisThing=1;
}

void my_stat_completion(int rc, const struct Stat *stat, const void *data) {
    fprintf(stderr, "%s: rc = %d Stat:\n", (char*)data, rc);
    dumpStat(stat);
    free((void*)data);
    if(batchMode)
      shutdownThisThing=1;
}

void my_silent_stat_completion(int rc, const struct Stat *stat,
        const void *data) {
    //    fprintf(stderr, "State completion: [%s] rc = %d\n", (char*)data, rc);
    sent++;
    free((void*)data);
}

static void sendRequest(const char* data) {
    zoo_aset(zh, "/od", data, strlen(data), -1, my_silent_stat_completion,
            strdup("/od"));
    zoo_aget(zh, "/od", 1, my_silent_data_completion, strdup("/od"));
}

void od_completion(int rc, const struct Stat *stat, const void *data) {
    int i;
    fprintf(stderr, "od command response: rc = %d Stat:\n", rc);
    dumpStat(stat);
    // send a whole bunch of requests
    recvd=0;
    sent=0;
    to_send=200;
    for (i=0; i<to_send; i++) {
        char buf[4096*16];
        memset(buf, -1, sizeof(buf)-1);
        buf[sizeof(buf)-1]=0;
        sendRequest(buf);
    }
}

int startsWith(const char *line, const char *prefix) {
    int len = strlen(prefix);
    return strncmp(line, prefix, len) == 0;
}

static const char *hostPort;
static int verbose = 0;

void processline(char *line) {
    int rc;
    int async = ((line[0] == 'a') && !(startsWith(line, "addauth ")));
    if (async) {
        line++;
    }
    if (startsWith(line, "help")) {
      fprintf(stderr, "    create [+[e|s|c|t=ttl]] <path>\n");
      fprintf(stderr, "    create2 [+[e|s|c|t=ttl]] <path>\n");
      fprintf(stderr, "    delete <path>\n");
      fprintf(stderr, "    set <path> <data>\n");
      fprintf(stderr, "    get <path>\n");
      fprintf(stderr, "    ls <path>\n");
      fprintf(stderr, "    ls2 <path>\n");
      fprintf(stderr, "    sync <path>\n");
      fprintf(stderr, "    exists <path>\n");
      fprintf(stderr, "    wexists <path>\n");
      fprintf(stderr, "    myid\n");
      fprintf(stderr, "    verbose\n");
      fprintf(stderr, "    addauth <id> <scheme>\n");
      fprintf(stderr, "    config\n");
      fprintf(stderr, "    reconfig [-file <path> | -members <serverId=host:port1:port2;port3>,... | "
                          " -add <serverId=host:port1:port2;port3>,... | -remove <serverId>,...] [-version <version>]\n");
      fprintf(stderr, "    quit\n");
      fprintf(stderr, "\n");
      fprintf(stderr, "    prefix the command with the character 'a' to run the command asynchronously.\n");
      fprintf(stderr, "    run the 'verbose' command to toggle verbose logging.\n");
      fprintf(stderr, "    i.e. 'aget /foo' to get /foo asynchronously\n");
    } else if (startsWith(line, "verbose")) {
      if (verbose) {
        verbose = 0;
        zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
        fprintf(stderr, "logging level set to WARN\n");
      } else {
        verbose = 1;
        zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG);
        fprintf(stderr, "logging level set to DEBUG\n");
      }
    } else if (startsWith(line, "get ")) {
        line += 4;
        if (line[0] != '/') {
            fprintf(stderr, "Path must start with /, found: %s\n", line);
            return;
        }

        rc = zoo_aget(zh, line, 1, my_data_completion, strdup(line));
        if (rc) {
            fprintf(stderr, "Error %d for %s\n", rc, line);
        }
    } else if (strcmp(line, "config") == 0) {
       gettimeofday(&startTime, 0);
        rc = zoo_agetconfig(zh, 1, my_data_completion, strdup(ZOO_CONFIG_NODE));
        if (rc) {
            fprintf(stderr, "Error %d for %s\n", rc, line);
        }
   } else if (startsWith(line, "reconfig ")) {
           int syntaxError = 0;
           char* p = NULL;
           char* joining = NULL;
           char* leaving = NULL;
           char* members = NULL;
           size_t members_size = 0;

           int mode = 0; // 0 = not set, 1 = incremental, 2 = non-incremental
           int64_t version = -1;

           line += 9;
           p = strtok (strdup(line)," ");
           while (p != NULL) {
               if (strcmp(p, "-add")==0) {
                   p = strtok (NULL," ");
                   if (mode == 2 || p == NULL) {
                       syntaxError = 1;
                       break;
                   }
                   mode = 1;
                   joining = strdup(p);
               } else if (strcmp(p, "-remove")==0){
                   p = strtok (NULL," ");
                   if (mode == 2 || p == NULL) {
                       syntaxError = 1;
                       break;
                   }
                   mode = 1;
                   leaving = strdup(p);
               } else if (strcmp(p, "-members")==0) {
                   p = strtok (NULL," ");
                   if (mode == 1 || p == NULL) {
                       syntaxError = 1;
                       break;
                   }
                   mode = 2;
                   members = strdup(p);
               } else if (strcmp(p, "-file")==0){
                   FILE *fp = NULL;
                   p = strtok (NULL," ");
                   if (mode == 1 || p == NULL) {
                       syntaxError = 1;
                       break;
                   }
                   mode = 2;
                   fp = fopen(p, "r");
                   if (fp == NULL) {
                       fprintf(stderr, "Error reading file: %s\n", p);
                       syntaxError = 1;
                       break;
                   }
                   fseek(fp, 0L, SEEK_END);  /* Position to end of file */
                   members_size = ftell(fp);     /* Get file length */
                   rewind(fp);               /* Back to start of file */
                   members = calloc(members_size + 1, sizeof(char));
                   if(members == NULL )
                   {
                       fprintf(stderr, "\nInsufficient memory to read file: %s\n", p);
                       syntaxError = 1;
                       fclose(fp);
                       break;
                   }

                   /* Read the entire file into members
                    * NOTE: -- fread returns number of items successfully read
                    * not the number of bytes. We're requesting one item of
                    * members_size bytes. So we expect the return value here
                    * to be 1.
                    */
                   if (fread(members, members_size, 1, fp) != 1){
                       fprintf(stderr, "Error reading file: %s\n", p);
                       syntaxError = 1;
                       fclose(fp);
                        break;
                   }
                   fclose(fp);
               } else if (strcmp(p, "-version")==0){
                   p = strtok (NULL," ");
                   if (version != -1 || p == NULL){
                       syntaxError = 1;
                       break;
                   }
#ifdef WIN32
                   version = _strtoui64(p, NULL, 16);
#else
                   version = strtoull(p, NULL, 16);
#endif
                   if (version < 0) {
                       syntaxError = 1;
                       break;
                   }
               } else {
                   syntaxError = 1;
                   break;
               }
               p = strtok (NULL," ");
           }
           if (syntaxError) return;

           rc = zoo_areconfig(zh, joining, leaving, members, version, my_data_completion, strdup(line));
           free(joining);
           free(leaving);
           free(members);
           if (rc) {
               fprintf(stderr, "Error %d for %s\n", rc, line);
           }

   } else if (startsWith(line, "set ")) {
        char *ptr;
        line += 4;
        if (line[0] != '/') {
            fprintf(stderr, "Path must start with /, found: %s\n", line);
            return;
        }
        ptr = strchr(line, ' ');
        if (!ptr) {
            fprintf(stderr, "No data found after path\n");
            return;
        }
        *ptr = '\0';
        ptr++;
        rc = zoo_aset(zh, line, ptr, strlen(ptr), -1, my_stat_completion,
                strdup(line));
        if (rc) {
            fprintf(stderr, "Error %d for %s\n", rc, line);
        }
    } else if (startsWith(line, "ls ")) {
        line += 3;
        if (line[0] != '/') {
            fprintf(stderr, "Path must start with /, found: %s\n", line);
            return;
        }
        gettimeofday(&startTime, 0);
        rc= zoo_aget_children(zh, line, 1, my_strings_completion, strdup(line));
        if (rc) {
            fprintf(stderr, "Error %d for %s\n", rc, line);
        }
    } else if (startsWith(line, "ls2 ")) {
        line += 4;
        if (line[0] != '/') {
            fprintf(stderr, "Path must start with /, found: %s\n", line);
            return;
        }
        gettimeofday(&startTime, 0);
        rc= zoo_aget_children2(zh, line, 1, my_strings_stat_completion, strdup(line));
        if (rc) {
            fprintf(stderr, "Error %d for %s\n", rc, line);
        }
    } else if (startsWith(line, "create ") || startsWith(line, "create2 ")) {
        int mode = 0;
        int64_t ttl_value = -1;
        int is_create2 = startsWith(line, "create2 ");
        line += is_create2 ? 8 : 7;

        if (line[0] == '+') {
            int ephemeral = 0;
            int sequential = 0;
            int container = 0;
            int ttl = 0;

            line++;

            while (*line != ' ' && *line != '\0') {
                switch (*line) {
                case 'e':
                    ephemeral = 1;
                    break;
                case 's':
                    sequential = 1;
                    break;
                case 'c':
                    container = 1;
                    break;
                case 't':
                    ttl = 1;

                    line++;

                    if (*line != '=') {
                        fprintf(stderr, "Missing ttl value after +t\n");
                        return;
                    }

                    line++;

                    ttl_value = strtol(line, &line, 10);

                    if (ttl_value <= 0) {
                        fprintf(stderr, "ttl value must be a positive integer\n");
                        return;
                    }

                    // move back line pointer to the last digit
                    line--;

                    break;
                default:
                    fprintf(stderr, "Unknown option: %c\n", *line);
                    return;
                }

                line++;
            }

            if (ephemeral != 0 && sequential == 0 && container == 0 && ttl == 0) {
                mode = ZOO_EPHEMERAL;
            } else if (ephemeral == 0 && sequential != 0 && container == 0 && ttl == 0) {
                mode = ZOO_PERSISTENT_SEQUENTIAL;
            } else if (ephemeral != 0 && sequential != 0 && container == 0 && ttl == 0) {
                mode = ZOO_EPHEMERAL_SEQUENTIAL;
            } else if (ephemeral == 0 && sequential == 0 && container != 0 && ttl == 0) {
                mode = ZOO_CONTAINER;
            } else if (ephemeral == 0 && sequential == 0 && container == 0 && ttl != 0) {
                mode = ZOO_PERSISTENT_WITH_TTL;
            } else if (ephemeral == 0 && sequential != 0 && container == 0 && ttl != 0) {
                mode = ZOO_PERSISTENT_SEQUENTIAL_WITH_TTL;
            } else {
                fprintf(stderr, "Invalid mode.\n");
                return;
            }

            if (*line == ' ') {
                line++;
            }
        }
        if (line[0] != '/') {
            fprintf(stderr, "Path must start with /, found: %s\n", line);
            return;
        }
        fprintf(stderr, "Creating [%s] node (mode: %d)\n", line, mode);
//        {
//            struct ACL _CREATE_ONLY_ACL_ACL[] = {{ZOO_PERM_CREATE, ZOO_ANYONE_ID_UNSAFE}};
//            struct ACL_vector CREATE_ONLY_ACL = {1,_CREATE_ONLY_ACL_ACL};
//            rc = zoo_acreate(zh, line, "new", 3, &CREATE_ONLY_ACL, flags,
//                    my_string_completion, strdup(line));
//        }
        if (is_create2) {
          rc = zoo_acreate2_ttl(zh, line, "new", 3, &ZOO_OPEN_ACL_UNSAFE, mode, ttl_value,
                my_string_stat_completion_free_data, strdup(line));
        } else {
          rc = zoo_acreate_ttl(zh, line, "new", 3, &ZOO_OPEN_ACL_UNSAFE, mode, ttl_value,
                my_string_completion_free_data, strdup(line));
        }
        if (rc) {
            fprintf(stderr, "Error %d for %s\n", rc, line);
        }
    } else if (startsWith(line, "delete ")) {
        line += 7;
        if (line[0] != '/') {
            fprintf(stderr, "Path must start with /, found: %s\n", line);
            return;
        }
        rc = zoo_adelete(zh, line, -1, my_void_completion, strdup(line));
        if (rc) {
            fprintf(stderr, "Error %d for %s\n", rc, line);
        }
    } else if (startsWith(line, "sync ")) {
        line += 5;
        if (line[0] != '/') {
            fprintf(stderr, "Path must start with /, found: %s\n", line);
            return;
        }
        rc = zoo_async(zh, line, my_string_completion_free_data, strdup(line));
        if (rc) {
            fprintf(stderr, "Error %d for %s\n", rc, line);
        }
    } else if (startsWith(line, "wexists ")) {
#ifdef THREADED
        struct Stat stat;
#endif
        line += 8;
        if (line[0] != '/') {
            fprintf(stderr, "Path must start with /, found: %s\n", line);
            return;
        }
#ifndef THREADED
        rc = zoo_awexists(zh, line, watcher, (void*) 0, my_stat_completion, strdup(line));
#else
        rc = zoo_wexists(zh, line, watcher, (void*) 0, &stat);
#endif
        if (rc) {
            fprintf(stderr, "Error %d for %s\n", rc, line);
        }
    } else if (startsWith(line, "exists ")) {
#ifdef THREADED
        struct Stat stat;
#endif
        line += 7;
        if (line[0] != '/') {
            fprintf(stderr, "Path must start with /, found: %s\n", line);
            return;
        }
#ifndef THREADED
        rc = zoo_aexists(zh, line, 1, my_stat_completion, strdup(line));
#else
        rc = zoo_exists(zh, line, 1, &stat);
#endif
        if (rc) {
            fprintf(stderr, "Error %d for %s\n", rc, line);
        }
    } else if (strcmp(line, "myid") == 0) {
        printf("session Id = %llx\n", _LL_CAST_ zoo_client_id(zh)->client_id);
    } else if (strcmp(line, "reinit") == 0) {
        zookeeper_close(zh);
        // we can't send myid to the server here -- zookeeper_close() removes
        // the session on the server. We must start anew.
        zh = zookeeper_init(hostPort, watcher, 30000, 0, 0, 0);
    } else if (startsWith(line, "quit")) {
        fprintf(stderr, "Quitting...\n");
        shutdownThisThing=1;
    } else if (startsWith(line, "od")) {
        const char val[]="fire off";
        fprintf(stderr, "Overdosing...\n");
        rc = zoo_aset(zh, "/od", val, sizeof(val)-1, -1, od_completion, 0);
        if (rc)
            fprintf(stderr, "od command failed: %d\n", rc);
    } else if (startsWith(line, "addauth ")) {
      char *ptr;
      line += 8;
      ptr = strchr(line, ' ');
      if (ptr) {
        *ptr = '\0';
        ptr++;
      }
      zoo_add_auth(zh, line, ptr, ptr ? strlen(ptr) : 0, NULL, NULL);
    }
}
/*
 * Look for a command in the form 'cmd:command'.
 * Strips the prefix and copies the command in buf.
 * Returns 0 if the argument does not start with the prefix.
 * Returns -1 in case of error (command too long).
 * Returns 1 in case of success.
 * 
 */
int handleBatchMode(char* arg, char* buf, size_t maxlen) {    
    size_t cmdlen = strlen(arg);
    if (cmdlen < 4) {
        // too short
        return 0;
    }
    cmdlen -= 4;
    if(strncmp("cmd:", arg, 4) != 0){
        return 0;        
    }
    // we must leave space for the NULL terminator
    if (cmdlen >= maxlen) {
          fprintf(stderr,
                  "Command length %zu exceeds max length of %zu\n",
                  cmdlen,
                  maxlen);
          return -1;
    }
    memcpy(cmd, arg + 4, cmdlen);
    return 1;
}

int main(int argc, char **argv) {
    static struct option long_options[] = {
            {"host",     required_argument, NULL, 'h'}, //hostPort
            {"ssl",      required_argument, NULL, 's'}, //certificate files
            {"myid",     required_argument, NULL, 'm'}, //myId file
            {"cmd",      required_argument, NULL, 'c'}, //cmd
            {"readonly", no_argument, NULL, 'r'}, //read-only
            {"debug",    no_argument, NULL, 'd'}, //set log level to DEBUG from the beginning
            {NULL,      0,                 NULL, 0},
    };
#ifndef THREADED
    fd_set rfds, wfds, efds;
    int processed=0;
#endif
    char buffer[4096];
    char p[2048];
#ifdef YCA
    char *cert=0;
    char appId[64];
#endif
    int bufoff = 0;
    int flags;
    FILE *fh;

    int opt;
    int option_index = 0;

    verbose = 0;
    zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);

    flags = 0;
    while ((opt = getopt_long(argc, argv, "h:s:m:c:rd", long_options, &option_index)) != -1) {
        switch (opt) {
            case 'h':
                hostPort = strdup(optarg);
                break;
            case 'm':
                clientIdFile = strdup(optarg);
                fh = fopen(clientIdFile, "r");
                if (fh) {
                    if (fread(&myid, sizeof(myid), 1, fh) != sizeof(myid)) {
                        memset(&myid, 0, sizeof(myid));
                    }
                    fclose(fh);
                }
                break;
            case 'r':
                flags = ZOO_READONLY;
                break;
            case 'c':
                cmd = strdup(optarg);
                batchMode = 1;
                fprintf(stderr,"Batch mode: %s\n",cmd);
                break;
            case 's':
                cert = strdup(optarg);
                break;
            case 'd':
                verbose = 1;
                zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG);
                fprintf(stderr, "logging level set to DEBUG\n");
                break;
            case '?':
                if (optopt == 'h') {
                    fprintf (stderr, "Option -%c requires host list.\n", optopt);
                } else if (isprint (optopt)) {
                    fprintf (stderr, "Unknown option `-%c'.\n", optopt);
                } else {
                    fprintf (stderr,
                             "Unknown option character `\\x%x'.\n",
                             optopt);
                    return 1;
                }
        }
    }

    if (!hostPort) {
        fprintf(stderr,
                "\nUSAGE:    %s -h zk_host_1:port_1,zk_host_2:port_2,... [OPTIONAL ARGS]\n\n"
                "MANDATORY ARGS:\n"
                "-h, --host <host:port pairs>   Comma separated list of ZooKeeper host:port pairs\n\n"
                "OPTIONAL ARGS:\n"
                "-m, --myid <clientid file>     Path to the file contains the client ID\n"
                "-c, --cmd <command>            Command to execute, e.g. ls|ls2|create|create2|od|...\n"
                "-s, --ssl <ssl params>         Comma separated parameters to initiate SSL connection\n"
                "                                 e.g.: server_cert.crt,client_cert.crt,client_priv_key.pem,passwd\n"
                "-r, --readonly                 Connect in read-only mode\n"
                "-d, --debug                    Activate debug logs right from the beginning (you can also use the \n"
                "                                 command 'verbose' later to activate debug logs in the cli shell)\n\n",
                argv[0]);
        fprintf(stderr,
                "Version: ZooKeeper cli (c client) version %s\n",
                ZOO_VERSION);
        return 2;
    }

#ifdef YCA
    strcpy(appId,"yahoo.example.yca_test");
    cert = yca_get_cert_once(appId);
    if(cert!=0) {
        fprintf(stderr,"Certificate for appid [%s] is [%s]\n",appId,cert);
        strncpy(p,cert,sizeof(p)-1);
        free(cert);
    } else {
      fprintf(stderr,"Certificate for appid [%s] not found\n",appId);
      strcpy(p,"dummy");
    }
#else
    strcpy(p, "dummy");
#endif
    zoo_deterministic_conn_order(1); // enable deterministic order

#ifdef HAVE_OPENSSL_H
    if (!cert) {
        zh = zookeeper_init(hostPort, watcher, 30000, &myid, NULL, flags);
    } else {
        zh = zookeeper_init_ssl(hostPort, cert, watcher, 30000, &myid, NULL, flags);
    }
#else
    zh = zookeeper_init(hostPort, watcher, 30000, &myid, NULL, flags);
#endif

    if (!zh) {
        return errno;
    }

#ifdef YCA
    if(zoo_add_auth(zh,"yca",p,strlen(p),0,0)!=ZOK)
    return 2;
#endif

#ifdef THREADED
    while(!shutdownThisThing) {
        int rc;
        int len = sizeof(buffer) - bufoff -1;
        if (len <= 0) {
            fprintf(stderr, "Can't handle lines that long!\n");
            exit(2);
        }
        rc = read(0, buffer+bufoff, len);
        if (rc <= 0) {
            fprintf(stderr, "bye\n");
            shutdownThisThing=1;
            break;
        }
        bufoff += rc;
        buffer[bufoff] = '\0';
        while (strchr(buffer, '\n')) {
            char *ptr = strchr(buffer, '\n');
            *ptr = '\0';
            processline(buffer);
            ptr++;
            memmove(buffer, ptr, strlen(ptr)+1);
            bufoff = 0;
        }
    }
#else
    FD_ZERO(&rfds);
    FD_ZERO(&wfds);
    FD_ZERO(&efds);
    while (!shutdownThisThing) {
        int fd;
        int interest;
        int events;
        struct timeval tv;
        int rc;
        zookeeper_interest(zh, &fd, &interest, &tv);
        if (fd != -1) {
            if (interest&ZOOKEEPER_READ) {
                FD_SET(fd, &rfds);
            } else {
                FD_CLR(fd, &rfds);
            }
            if (interest&ZOOKEEPER_WRITE) {
                FD_SET(fd, &wfds);
            } else {
                FD_CLR(fd, &wfds);
            }
        } else {
            fd = 0;
        }
        FD_SET(0, &rfds);
        rc = select(fd+1, &rfds, &wfds, &efds, &tv);
        events = 0;
        if (rc > 0) {
            if (FD_ISSET(fd, &rfds)) {
                events |= ZOOKEEPER_READ;
            }
            if (FD_ISSET(fd, &wfds)) {
                events |= ZOOKEEPER_WRITE;
            }
        }
        if(batchMode && processed==0){
          //batch mode
          processline(cmd);
          processed=1;
        }
        if (!processed && FD_ISSET(0, &rfds)) {
            int rc;
            int len = sizeof(buffer) - bufoff -1;
            if (len <= 0) {
                fprintf(stderr, "Can't handle lines that long!\n");
                exit(2);
            }
            rc = read(0, buffer+bufoff, len);
            if (rc <= 0) {
                fprintf(stderr, "bye\n");
                break;
            }
            bufoff += rc;
            buffer[bufoff] = '\0';
            while (strchr(buffer, '\n')) {
                char *ptr = strchr(buffer, '\n');
                *ptr = '\0';
                processline(buffer);
                ptr++;
                memmove(buffer, ptr, strlen(ptr)+1);
                bufoff = 0;
            }
        }
        zookeeper_process(zh, events);
    }
#endif
    if (to_send!=0)
        fprintf(stderr,"Recvd %d responses for %d requests sent\n",recvd,sent);
    zookeeper_close(zh);
    return 0;
}
