blob: e41f950828d07dce677d06d5cd574f816a3daaa2 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#include <stdlib.h>
#include <string.h>
#include <curl/curl.h>
#include "hdfs_http_client.h"
static pthread_mutex_t curlInitMutex = PTHREAD_MUTEX_INITIALIZER;
static volatile int curlGlobalInited = 0;
const char *hdfs_strerror(int errnoval)
const char *msg = NULL;
if (errnoval < 0 || errnoval >= sys_nerr) {
msg = "Invalid Error Code";
} else if (sys_errlist == NULL) {
msg = "Unknown Error";
} else {
msg = sys_errlist[errnoval];
return msg;
int initResponseBuffer(struct ResponseBuffer **buffer)
struct ResponseBuffer *info = NULL;
int ret = 0;
info = calloc(1, sizeof(struct ResponseBuffer));
if (!info) {
ret = ENOMEM;
*buffer = info;
return ret;
void freeResponseBuffer(struct ResponseBuffer *buffer)
if (buffer) {
if (buffer->content) {
buffer = NULL;
void freeResponse(struct Response *resp)
if (resp) {
resp = NULL;
* Callback used by libcurl for allocating local buffer and
* reading data to local buffer
static size_t writefunc(void *ptr, size_t size,
size_t nmemb, struct ResponseBuffer *rbuffer)
void *temp = NULL;
if (size * nmemb < 1) {
return 0;
if (!rbuffer) {
"ERROR: ResponseBuffer is NULL for the callback writefunc.\n");
return 0;
if (rbuffer->remaining < size * nmemb) {
temp = realloc(rbuffer->content, rbuffer->offset + size * nmemb + 1);
if (temp == NULL) {
fprintf(stderr, "ERROR: fail to realloc in callback writefunc.\n");
return 0;
rbuffer->content = temp;
rbuffer->remaining = size * nmemb;
memcpy(rbuffer->content + rbuffer->offset, ptr, size * nmemb);
rbuffer->offset += size * nmemb;
(rbuffer->content)[rbuffer->offset] = '\0';
rbuffer->remaining -= size * nmemb;
return size * nmemb;
* Callback used by libcurl for reading data into buffer provided by user,
* thus no need to reallocate buffer.
static size_t writeFuncWithUserBuffer(void *ptr, size_t size,
size_t nmemb, struct ResponseBuffer *rbuffer)
size_t toCopy = 0;
if (size * nmemb < 1) {
return 0;
if (!rbuffer || !rbuffer->content) {
"ERROR: buffer to read is NULL for the "
"callback writeFuncWithUserBuffer.\n");
return 0;
toCopy = rbuffer->remaining < (size * nmemb) ?
rbuffer->remaining : (size * nmemb);
memcpy(rbuffer->content + rbuffer->offset, ptr, toCopy);
rbuffer->offset += toCopy;
rbuffer->remaining -= toCopy;
return toCopy;
* Callback used by libcurl for writing data to remote peer
static size_t readfunc(void *ptr, size_t size, size_t nmemb, void *stream)
struct webhdfsBuffer *wbuffer = NULL;
if (size * nmemb < 1) {
return 0;
wbuffer = stream;
while (wbuffer->remaining == 0) {
* The current remainning bytes to write is 0,
* check closeFlag to see whether need to finish the transfer.
* if yes, return 0; else, wait
if (wbuffer->closeFlag) { // We can close the transfer now
//For debug
fprintf(stderr, "CloseFlag is set, ready to close the transfer\n");
return 0;
} else {
// remaining == 0 but closeFlag is not set
// indicates that user's buffer has been transferred
if (wbuffer->remaining > 0 && !wbuffer->closeFlag) {
size_t copySize = wbuffer->remaining < size * nmemb ?
wbuffer->remaining : size * nmemb;
memcpy(ptr, wbuffer->wbuffer + wbuffer->offset, copySize);
wbuffer->offset += copySize;
wbuffer->remaining -= copySize;
return copySize;
} else {
fprintf(stderr, "ERROR: webhdfsBuffer's remaining is %ld, "
"it should be a positive value!\n", wbuffer->remaining);
return 0;
* Initialize the global libcurl environment
static void initCurlGlobal()
if (!curlGlobalInited) {
if (!curlGlobalInited) {
curlGlobalInited = 1;
* Launch simple commands (commands without file I/O) and return response
* @param url Target URL
* @param method HTTP method (GET/PUT/POST)
* @param followloc Whether or not need to set CURLOPT_FOLLOWLOCATION
* @param response Response from remote service
* @return 0 for success and non-zero value to indicate error
static int launchCmd(const char *url, enum HttpHeader method,
enum Redirect followloc, struct Response **response)
CURL *curl = NULL;
CURLcode curlCode;
int ret = 0;
struct Response *resp = NULL;
resp = calloc(1, sizeof(struct Response));
if (!resp) {
return ENOMEM;
ret = initResponseBuffer(&(resp->body));
if (ret) {
goto done;
ret = initResponseBuffer(&(resp->header));
if (ret) {
goto done;
curl = curl_easy_init();
if (!curl) {
ret = ENOMEM; // curl_easy_init does not return error code,
// and most of its errors are caused by malloc()
fprintf(stderr, "ERROR in curl_easy_init.\n");
goto done;
/* Set callback function for reading data from remote service */
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header);
curl_easy_setopt(curl, CURLOPT_URL, url);
switch(method) {
case GET:
case PUT:
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
case POST:
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
case DELETE:
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
ret = EINVAL;
fprintf(stderr, "ERROR: Invalid HTTP method\n");
goto done;
if (followloc == YES) {
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
/* Now run the curl handler */
curlCode = curl_easy_perform(curl);
if (curlCode != CURLE_OK) {
ret = EIO;
fprintf(stderr, "ERROR: preform the URL %s failed, <%d>: %s\n",
url, curlCode, curl_easy_strerror(curlCode));
if (curl != NULL) {
if (ret) {
resp = NULL;
*response = resp;
return ret;
* Launch the read request. The request is sent to the NameNode and then
* redirected to corresponding DataNode
* @param url The URL for the read request
* @param resp The response containing the buffer provided by user
* @return 0 for success and non-zero value to indicate error
static int launchReadInternal(const char *url, struct Response* resp)
CURL *curl;
CURLcode curlCode;
int ret = 0;
if (!resp || !resp->body || !resp->body->content) {
"ERROR: invalid user-provided buffer!\n");
return EINVAL;
/* get a curl handle */
curl = curl_easy_init();
if (!curl) {
fprintf(stderr, "ERROR in curl_easy_init.\n");
return ENOMEM;
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeFuncWithUserBuffer);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header);
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
curlCode = curl_easy_perform(curl);
if (curlCode != CURLE_OK && curlCode != CURLE_PARTIAL_FILE) {
ret = EIO;
fprintf(stderr, "ERROR: preform the URL %s failed, <%d>: %s\n",
url, curlCode, curl_easy_strerror(curlCode));
return ret;
* The function does the write operation by connecting to a DataNode.
* The function keeps the connection with the DataNode until
* the closeFlag is set. Whenever the current data has been sent out,
* the function blocks waiting for further input from user or close.
* @param url URL of the remote DataNode
* @param method PUT for create and POST for append
* @param uploadBuffer Buffer storing user's data to write
* @param response Response from remote service
* @return 0 for success and non-zero value to indicate error
static int launchWrite(const char *url, enum HttpHeader method,
struct webhdfsBuffer *uploadBuffer,
struct Response **response)
CURLcode curlCode;
struct Response* resp = NULL;
struct curl_slist *chunk = NULL;
CURL *curl = NULL;
int ret = 0;
if (!uploadBuffer) {
fprintf(stderr, "ERROR: upload buffer is NULL!\n");
return EINVAL;
resp = calloc(1, sizeof(struct Response));
if (!resp) {
return ENOMEM;
ret = initResponseBuffer(&(resp->body));
if (ret) {
goto done;
ret = initResponseBuffer(&(resp->header));
if (ret) {
goto done;
// Connect to the datanode in order to create the lease in the namenode
curl = curl_easy_init();
if (!curl) {
fprintf(stderr, "ERROR: failed to initialize the curl handle.\n");
return ENOMEM;
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header);
curl_easy_setopt(curl, CURLOPT_READFUNCTION, readfunc);
curl_easy_setopt(curl, CURLOPT_READDATA, uploadBuffer);
curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
chunk = curl_slist_append(chunk, "Transfer-Encoding: chunked");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
chunk = curl_slist_append(chunk, "Expect:");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
switch(method) {
case PUT:
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
case POST:
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
ret = EINVAL;
fprintf(stderr, "ERROR: Invalid HTTP method\n");
goto done;
curlCode = curl_easy_perform(curl);
if (curlCode != CURLE_OK) {
ret = EIO;
fprintf(stderr, "ERROR: preform the URL %s failed, <%d>: %s\n",
url, curlCode, curl_easy_strerror(curlCode));
if (chunk != NULL) {
if (curl != NULL) {
if (ret) {
resp = NULL;
*response = resp;
return ret;
int launchMKDIR(const char *url, struct Response **resp)
return launchCmd(url, PUT, NO, resp);
int launchRENAME(const char *url, struct Response **resp)
return launchCmd(url, PUT, NO, resp);
int launchGFS(const char *url, struct Response **resp)
return launchCmd(url, GET, NO, resp);
int launchLS(const char *url, struct Response **resp)
return launchCmd(url, GET, NO, resp);
int launchCHMOD(const char *url, struct Response **resp)
return launchCmd(url, PUT, NO, resp);
int launchCHOWN(const char *url, struct Response **resp)
return launchCmd(url, PUT, NO, resp);
int launchDELETE(const char *url, struct Response **resp)
return launchCmd(url, DELETE, NO, resp);
int launchOPEN(const char *url, struct Response* resp)
return launchReadInternal(url, resp);
int launchUTIMES(const char *url, struct Response **resp)
return launchCmd(url, PUT, NO, resp);
int launchNnWRITE(const char *url, struct Response **resp)
return launchCmd(url, PUT, NO, resp);
int launchNnAPPEND(const char *url, struct Response **resp)
return launchCmd(url, POST, NO, resp);
int launchDnWRITE(const char *url, struct webhdfsBuffer *buffer,
struct Response **resp)
return launchWrite(url, PUT, buffer, resp);
int launchDnAPPEND(const char *url, struct webhdfsBuffer *buffer,
struct Response **resp)
return launchWrite(url, POST, buffer, resp);
int launchSETREPLICATION(const char *url, struct Response **resp)
return launchCmd(url, PUT, NO, resp);