blob: e41f950828d07dce677d06d5cd574f816a3daaa2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdlib.h>
#include <string.h>
#include <curl/curl.h>
#include "hdfs_http_client.h"
static pthread_mutex_t curlInitMutex = PTHREAD_MUTEX_INITIALIZER;
static volatile int curlGlobalInited = 0;
const char *hdfs_strerror(int errnoval)
{
const char *msg = NULL;
if (errnoval < 0 || errnoval >= sys_nerr) {
msg = "Invalid Error Code";
} else if (sys_errlist == NULL) {
msg = "Unknown Error";
} else {
msg = sys_errlist[errnoval];
}
return msg;
}
int initResponseBuffer(struct ResponseBuffer **buffer)
{
struct ResponseBuffer *info = NULL;
int ret = 0;
info = calloc(1, sizeof(struct ResponseBuffer));
if (!info) {
ret = ENOMEM;
}
*buffer = info;
return ret;
}
void freeResponseBuffer(struct ResponseBuffer *buffer)
{
if (buffer) {
if (buffer->content) {
free(buffer->content);
}
free(buffer);
buffer = NULL;
}
}
void freeResponse(struct Response *resp)
{
if (resp) {
freeResponseBuffer(resp->body);
freeResponseBuffer(resp->header);
free(resp);
resp = NULL;
}
}
/**
* Callback used by libcurl for allocating local buffer and
* reading data to local buffer
*/
static size_t writefunc(void *ptr, size_t size,
size_t nmemb, struct ResponseBuffer *rbuffer)
{
void *temp = NULL;
if (size * nmemb < 1) {
return 0;
}
if (!rbuffer) {
fprintf(stderr,
"ERROR: ResponseBuffer is NULL for the callback writefunc.\n");
return 0;
}
if (rbuffer->remaining < size * nmemb) {
temp = realloc(rbuffer->content, rbuffer->offset + size * nmemb + 1);
if (temp == NULL) {
fprintf(stderr, "ERROR: fail to realloc in callback writefunc.\n");
return 0;
}
rbuffer->content = temp;
rbuffer->remaining = size * nmemb;
}
memcpy(rbuffer->content + rbuffer->offset, ptr, size * nmemb);
rbuffer->offset += size * nmemb;
(rbuffer->content)[rbuffer->offset] = '\0';
rbuffer->remaining -= size * nmemb;
return size * nmemb;
}
/**
* Callback used by libcurl for reading data into buffer provided by user,
* thus no need to reallocate buffer.
*/
static size_t writeFuncWithUserBuffer(void *ptr, size_t size,
size_t nmemb, struct ResponseBuffer *rbuffer)
{
size_t toCopy = 0;
if (size * nmemb < 1) {
return 0;
}
if (!rbuffer || !rbuffer->content) {
fprintf(stderr,
"ERROR: buffer to read is NULL for the "
"callback writeFuncWithUserBuffer.\n");
return 0;
}
toCopy = rbuffer->remaining < (size * nmemb) ?
rbuffer->remaining : (size * nmemb);
memcpy(rbuffer->content + rbuffer->offset, ptr, toCopy);
rbuffer->offset += toCopy;
rbuffer->remaining -= toCopy;
return toCopy;
}
/**
* Callback used by libcurl for writing data to remote peer
*/
static size_t readfunc(void *ptr, size_t size, size_t nmemb, void *stream)
{
struct webhdfsBuffer *wbuffer = NULL;
if (size * nmemb < 1) {
return 0;
}
wbuffer = stream;
pthread_mutex_lock(&wbuffer->writeMutex);
while (wbuffer->remaining == 0) {
/*
* The current remainning bytes to write is 0,
* check closeFlag to see whether need to finish the transfer.
* if yes, return 0; else, wait
*/
if (wbuffer->closeFlag) { // We can close the transfer now
//For debug
fprintf(stderr, "CloseFlag is set, ready to close the transfer\n");
pthread_mutex_unlock(&wbuffer->writeMutex);
return 0;
} else {
// remaining == 0 but closeFlag is not set
// indicates that user's buffer has been transferred
pthread_cond_signal(&wbuffer->transfer_finish);
pthread_cond_wait(&wbuffer->newwrite_or_close,
&wbuffer->writeMutex);
}
}
if (wbuffer->remaining > 0 && !wbuffer->closeFlag) {
size_t copySize = wbuffer->remaining < size * nmemb ?
wbuffer->remaining : size * nmemb;
memcpy(ptr, wbuffer->wbuffer + wbuffer->offset, copySize);
wbuffer->offset += copySize;
wbuffer->remaining -= copySize;
pthread_mutex_unlock(&wbuffer->writeMutex);
return copySize;
} else {
fprintf(stderr, "ERROR: webhdfsBuffer's remaining is %ld, "
"it should be a positive value!\n", wbuffer->remaining);
pthread_mutex_unlock(&wbuffer->writeMutex);
return 0;
}
}
/**
* Initialize the global libcurl environment
*/
static void initCurlGlobal()
{
if (!curlGlobalInited) {
pthread_mutex_lock(&curlInitMutex);
if (!curlGlobalInited) {
curl_global_init(CURL_GLOBAL_ALL);
curlGlobalInited = 1;
}
pthread_mutex_unlock(&curlInitMutex);
}
}
/**
* Launch simple commands (commands without file I/O) and return response
*
* @param url Target URL
* @param method HTTP method (GET/PUT/POST)
* @param followloc Whether or not need to set CURLOPT_FOLLOWLOCATION
* @param response Response from remote service
* @return 0 for success and non-zero value to indicate error
*/
static int launchCmd(const char *url, enum HttpHeader method,
enum Redirect followloc, struct Response **response)
{
CURL *curl = NULL;
CURLcode curlCode;
int ret = 0;
struct Response *resp = NULL;
resp = calloc(1, sizeof(struct Response));
if (!resp) {
return ENOMEM;
}
ret = initResponseBuffer(&(resp->body));
if (ret) {
goto done;
}
ret = initResponseBuffer(&(resp->header));
if (ret) {
goto done;
}
initCurlGlobal();
curl = curl_easy_init();
if (!curl) {
ret = ENOMEM; // curl_easy_init does not return error code,
// and most of its errors are caused by malloc()
fprintf(stderr, "ERROR in curl_easy_init.\n");
goto done;
}
/* Set callback function for reading data from remote service */
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header);
curl_easy_setopt(curl, CURLOPT_URL, url);
switch(method) {
case GET:
break;
case PUT:
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
break;
case POST:
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
break;
case DELETE:
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
break;
default:
ret = EINVAL;
fprintf(stderr, "ERROR: Invalid HTTP method\n");
goto done;
}
if (followloc == YES) {
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
}
/* Now run the curl handler */
curlCode = curl_easy_perform(curl);
if (curlCode != CURLE_OK) {
ret = EIO;
fprintf(stderr, "ERROR: preform the URL %s failed, <%d>: %s\n",
url, curlCode, curl_easy_strerror(curlCode));
}
done:
if (curl != NULL) {
curl_easy_cleanup(curl);
}
if (ret) {
free(resp);
resp = NULL;
}
*response = resp;
return ret;
}
/**
* Launch the read request. The request is sent to the NameNode and then
* redirected to corresponding DataNode
*
* @param url The URL for the read request
* @param resp The response containing the buffer provided by user
* @return 0 for success and non-zero value to indicate error
*/
static int launchReadInternal(const char *url, struct Response* resp)
{
CURL *curl;
CURLcode curlCode;
int ret = 0;
if (!resp || !resp->body || !resp->body->content) {
fprintf(stderr,
"ERROR: invalid user-provided buffer!\n");
return EINVAL;
}
initCurlGlobal();
/* get a curl handle */
curl = curl_easy_init();
if (!curl) {
fprintf(stderr, "ERROR in curl_easy_init.\n");
return ENOMEM;
}
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeFuncWithUserBuffer);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header);
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
curlCode = curl_easy_perform(curl);
if (curlCode != CURLE_OK && curlCode != CURLE_PARTIAL_FILE) {
ret = EIO;
fprintf(stderr, "ERROR: preform the URL %s failed, <%d>: %s\n",
url, curlCode, curl_easy_strerror(curlCode));
}
curl_easy_cleanup(curl);
return ret;
}
/**
* The function does the write operation by connecting to a DataNode.
* The function keeps the connection with the DataNode until
* the closeFlag is set. Whenever the current data has been sent out,
* the function blocks waiting for further input from user or close.
*
* @param url URL of the remote DataNode
* @param method PUT for create and POST for append
* @param uploadBuffer Buffer storing user's data to write
* @param response Response from remote service
* @return 0 for success and non-zero value to indicate error
*/
static int launchWrite(const char *url, enum HttpHeader method,
struct webhdfsBuffer *uploadBuffer,
struct Response **response)
{
CURLcode curlCode;
struct Response* resp = NULL;
struct curl_slist *chunk = NULL;
CURL *curl = NULL;
int ret = 0;
if (!uploadBuffer) {
fprintf(stderr, "ERROR: upload buffer is NULL!\n");
return EINVAL;
}
initCurlGlobal();
resp = calloc(1, sizeof(struct Response));
if (!resp) {
return ENOMEM;
}
ret = initResponseBuffer(&(resp->body));
if (ret) {
goto done;
}
ret = initResponseBuffer(&(resp->header));
if (ret) {
goto done;
}
// Connect to the datanode in order to create the lease in the namenode
curl = curl_easy_init();
if (!curl) {
fprintf(stderr, "ERROR: failed to initialize the curl handle.\n");
return ENOMEM;
}
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header);
curl_easy_setopt(curl, CURLOPT_READFUNCTION, readfunc);
curl_easy_setopt(curl, CURLOPT_READDATA, uploadBuffer);
curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
chunk = curl_slist_append(chunk, "Transfer-Encoding: chunked");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
chunk = curl_slist_append(chunk, "Expect:");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
switch(method) {
case PUT:
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
break;
case POST:
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
break;
default:
ret = EINVAL;
fprintf(stderr, "ERROR: Invalid HTTP method\n");
goto done;
}
curlCode = curl_easy_perform(curl);
if (curlCode != CURLE_OK) {
ret = EIO;
fprintf(stderr, "ERROR: preform the URL %s failed, <%d>: %s\n",
url, curlCode, curl_easy_strerror(curlCode));
}
done:
if (chunk != NULL) {
curl_slist_free_all(chunk);
}
if (curl != NULL) {
curl_easy_cleanup(curl);
}
if (ret) {
free(resp);
resp = NULL;
}
*response = resp;
return ret;
}
int launchMKDIR(const char *url, struct Response **resp)
{
return launchCmd(url, PUT, NO, resp);
}
int launchRENAME(const char *url, struct Response **resp)
{
return launchCmd(url, PUT, NO, resp);
}
int launchGFS(const char *url, struct Response **resp)
{
return launchCmd(url, GET, NO, resp);
}
int launchLS(const char *url, struct Response **resp)
{
return launchCmd(url, GET, NO, resp);
}
int launchCHMOD(const char *url, struct Response **resp)
{
return launchCmd(url, PUT, NO, resp);
}
int launchCHOWN(const char *url, struct Response **resp)
{
return launchCmd(url, PUT, NO, resp);
}
int launchDELETE(const char *url, struct Response **resp)
{
return launchCmd(url, DELETE, NO, resp);
}
int launchOPEN(const char *url, struct Response* resp)
{
return launchReadInternal(url, resp);
}
int launchUTIMES(const char *url, struct Response **resp)
{
return launchCmd(url, PUT, NO, resp);
}
int launchNnWRITE(const char *url, struct Response **resp)
{
return launchCmd(url, PUT, NO, resp);
}
int launchNnAPPEND(const char *url, struct Response **resp)
{
return launchCmd(url, POST, NO, resp);
}
int launchDnWRITE(const char *url, struct webhdfsBuffer *buffer,
struct Response **resp)
{
return launchWrite(url, PUT, buffer, resp);
}
int launchDnAPPEND(const char *url, struct webhdfsBuffer *buffer,
struct Response **resp)
{
return launchWrite(url, POST, buffer, resp);
}
int launchSETREPLICATION(const char *url, struct Response **resp)
{
return launchCmd(url, PUT, NO, resp);
}