| #include "s3interface.h" |
| |
| // use destructor ~XMLContextHolder() to do the cleanup |
| class XMLContextHolder { |
| public: |
| XMLContextHolder(xmlParserCtxtPtr ctx) : context(ctx) { |
| } |
| ~XMLContextHolder() { |
| if (context != NULL) { |
| xmlFreeDoc(context->myDoc); |
| xmlFreeParserCtxt(context); |
| } |
| } |
| |
| private: |
| xmlParserCtxtPtr context; |
| }; |
| |
| S3InterfaceService::S3InterfaceService() : restfulService(NULL), params("") { |
| xmlInitParser(); |
| } |
| |
| S3InterfaceService::S3InterfaceService(const S3Params &p) : restfulService(NULL), params(p) { |
| xmlInitParser(); |
| } |
| |
| S3InterfaceService::~S3InterfaceService() { |
| // Cleanup function for the XML library. |
| xmlCleanupParser(); |
| } |
| |
| Response S3InterfaceService::getResponseWithRetries(const string &url, HTTPHeaders &headers, |
| uint64_t retries) { |
| string code; |
| string message; |
| uint64_t retry = retries; |
| |
| while (retry--) { |
| try { |
| return this->restfulService->get(url, headers); |
| } catch (S3ConnectionError &e) { |
| message = e.getMessage(); |
| if (S3QueryIsAbortInProgress()) { |
| S3_DIE(S3QueryAbort, "Downloading is interrupted"); |
| } |
| S3WARN("Failed to get a good response in GET from '%s', retrying ...", url.c_str()); |
| } catch (S3LogicError &e) { |
| code = e.getCode(); |
| if (code == "NoSuchKey") { |
| sleep(1); |
| continue; |
| } else { |
| throw e; |
| } |
| } |
| }; |
| |
| S3_DIE(S3FailedAfterRetry, url, retries, message); |
| }; |
| |
| Response S3InterfaceService::putResponseWithRetries(const string &url, HTTPHeaders &headers, |
| S3VectorUInt8 &data, uint64_t retries) { |
| string message; |
| uint64_t retry = retries; |
| |
| while (retry--) { |
| try { |
| return this->restfulService->put(url, headers, data); |
| } catch (S3ConnectionError &e) { |
| message = e.getMessage(); |
| if (S3QueryIsAbortInProgress()) { |
| S3_DIE(S3QueryAbort, "Uploading is interrupted"); |
| } |
| S3WARN("Failed to get a good response in PUT from '%s', retrying ...", url.c_str()); |
| } |
| }; |
| |
| S3_DIE(S3FailedAfterRetry, url, retries, message); |
| }; |
| |
| Response S3InterfaceService::postResponseWithRetries(const string &url, HTTPHeaders &headers, |
| const vector<uint8_t> &data, |
| uint64_t retries) { |
| string message; |
| uint64_t retry = retries; |
| |
| while (retry--) { |
| try { |
| return this->restfulService->post(url, headers, data); |
| } catch (S3ConnectionError &e) { |
| message = e.getMessage(); |
| if (S3QueryIsAbortInProgress()) { |
| S3_DIE(S3QueryAbort, "Uploading is interrupted"); |
| } |
| S3WARN("Failed to get a good response in POST from '%s', retrying ...", url.c_str()); |
| } |
| }; |
| |
| S3_DIE(S3FailedAfterRetry, url, retries, message); |
| } |
| |
| bool S3InterfaceService::isKeyExisted(ResponseCode code) { |
| return isSuccessfulResponse(code); |
| } |
| |
| ResponseCode S3InterfaceService::headResponseWithRetries(const string &url, HTTPHeaders &headers, |
| uint64_t retries) { |
| string message; |
| uint64_t retry = retries; |
| |
| while (retry--) { |
| try { |
| return this->restfulService->head(url, headers); |
| } catch (S3ConnectionError &e) { |
| message = e.getMessage(); |
| if (S3QueryIsAbortInProgress()) { |
| S3_DIE(S3QueryAbort, "Uploading is interrupted"); |
| } |
| |
| S3WARN("Failed to get a good response in HEAD from '%s', retrying ...", url.c_str()); |
| } |
| }; |
| |
| S3_DIE(S3FailedAfterRetry, url, retries, message); |
| } |
| |
| Response S3InterfaceService::deleteRequestWithRetries(const string &url, HTTPHeaders &headers, |
| uint64_t retries) { |
| string message; |
| uint64_t retry = retries; |
| |
| while (retry--) { |
| try { |
| return this->restfulService->deleteRequest(url, headers); |
| } catch (S3ConnectionError &e) { |
| message = e.getMessage(); |
| if (S3QueryIsAbortInProgress()) { |
| S3_DIE(S3QueryAbort, "Uploading is interrupted"); |
| } |
| S3WARN("Failed to get a good response in DELETE from '%s', retrying ...", url.c_str()); |
| } |
| }; |
| |
| S3_DIE(S3FailedAfterRetry, url, retries, message); |
| }; |
| |
| xmlParserCtxtPtr S3InterfaceService::getXMLContext(Response &response) { |
| xmlParserCtxtPtr xmlptr = |
| xmlCreatePushParserCtxt(NULL, NULL, (const char *)(response.getRawData().data()), |
| response.getRawData().size(), "getXMLContext.xml"); |
| if (xmlptr != NULL) { |
| xmlParseChunk(xmlptr, "", 0, 1); |
| } else { |
| S3ERROR("Failed to create XML parser context"); |
| } |
| return xmlptr; |
| } |
| |
| // require curl 7.17 higher |
| // http://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html |
| Response S3InterfaceService::getBucketResponse(const S3Url &s3Url, const string &encodedQuery) { |
| HTTPHeaders headers; |
| headers.Add(HOST, s3Url.getHostForCurl()); |
| headers.Add(X_AMZ_CONTENT_SHA256, |
| "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"); // sha256hex |
| // of empty |
| // string |
| |
| SignRequestV4("GET", &headers, s3Url.getRegion(), s3Url.getPathForCurl(), encodedQuery, |
| this->params.getCred()); |
| |
| stringstream urlWithQuery; |
| urlWithQuery << s3Url.getFullUrlForCurl() << "?" << encodedQuery; |
| |
| return this->getResponseWithRetries(urlWithQuery.str(), headers); |
| } |
| |
| bool S3InterfaceService::parseBucketXML(ListBucketResult *result, xmlParserCtxtPtr xmlcontext, |
| string &marker) { |
| if ((result == NULL) || (xmlcontext == NULL)) { |
| return false; |
| } |
| |
| xmlNode *rootElement = xmlDocGetRootElement(xmlcontext->myDoc); |
| if (rootElement == NULL) { |
| S3WARN("Failed to parse returned xml of bucket list"); |
| return false; |
| } |
| |
| xmlNodePtr cur; |
| bool is_truncated = false; |
| char *content = NULL; |
| char *key = NULL; |
| char *key_size = NULL; |
| char *next_marker = NULL; |
| |
| cur = rootElement->xmlChildrenNode; |
| while (cur != NULL) { |
| if (key) { |
| xmlFree(key); |
| key = NULL; |
| } |
| |
| if (!xmlStrcmp(cur->name, (const xmlChar *)"NextMarker")) { |
| content = (char *)xmlNodeGetContent(cur); |
| if (content) { |
| next_marker = strdup(content); |
| xmlFree(content); |
| } |
| } |
| |
| if (!xmlStrcmp(cur->name, (const xmlChar *)"IsTruncated")) { |
| content = (char *)xmlNodeGetContent(cur); |
| if (content) { |
| if (!strncmp(content, "true", 4)) { |
| is_truncated = true; |
| } |
| xmlFree(content); |
| } |
| } |
| |
| if (!xmlStrcmp(cur->name, (const xmlChar *)"Name")) { |
| content = (char *)xmlNodeGetContent(cur); |
| if (content) { |
| result->Name = content; |
| xmlFree(content); |
| } |
| } |
| |
| if (!xmlStrcmp(cur->name, (const xmlChar *)"Prefix")) { |
| content = (char *)xmlNodeGetContent(cur); |
| if (content) { |
| result->Prefix = content; |
| xmlFree(content); |
| // content is not used anymore in this loop |
| content = NULL; |
| } |
| } |
| |
| if (!xmlStrcmp(cur->name, (const xmlChar *)"Contents")) { |
| xmlNodePtr contNode = cur->xmlChildrenNode; |
| uint64_t size = 0; |
| |
| while (contNode != NULL) { |
| // no memleak here, every content has only one Key/Size node |
| if (!xmlStrcmp(contNode->name, (const xmlChar *)"Key")) { |
| key = (char *)xmlNodeGetContent(contNode); |
| } |
| if (!xmlStrcmp(contNode->name, (const xmlChar *)"Size")) { |
| key_size = (char *)xmlNodeGetContent(contNode); |
| // Size of S3 file is a natural number, don't worry |
| size = (uint64_t)atoll((const char *)key_size); |
| } |
| contNode = contNode->next; |
| } |
| |
| if (key) { |
| if (size > 0) { // skip empty item |
| result->contents.emplace_back(key, size); |
| } else { |
| S3INFO("Size of \"%s\" is %" PRIu64 ", skip it", key, size); |
| } |
| } |
| |
| if (key_size) { |
| xmlFree(key_size); |
| key_size = NULL; |
| } |
| } |
| |
| cur = cur->next; |
| } |
| |
| if (is_truncated && next_marker) { |
| marker = next_marker; |
| } else { |
| marker = (is_truncated && key) ? key : ""; |
| } |
| |
| if (key) { |
| xmlFree(key); |
| } |
| |
| return true; |
| } |
| |
| // ListBucket lists all keys in given bucket with given prefix. |
| // |
| // Return NULL when there is failure due to network instability or |
| // service unstable, so that caller could retry. |
| // |
| // Caller should delete returned object. |
| ListBucketResult S3InterfaceService::listBucket(S3Url &s3Url) { |
| ListBucketResult result; |
| |
| string marker = ""; |
| string encodedPrefix = s3Url.getPrefix(); |
| FindAndReplace(encodedPrefix, "/", "%2F"); |
| do { |
| // To get next set(up to 1000) keys in one iteration. |
| // S3 requires query parameters specified alphabetically. |
| |
| // marker and prefix are used as the values of query parameters here |
| // so URI encode their whole string, "/" also. |
| |
| // transfer /bucket/prefix to /bucket/?prefix=prefix because we need to "GET" a real thing |
| stringstream querySs; |
| if (!marker.empty()) { |
| querySs << "marker=" << UriEncode(marker); |
| } |
| |
| if (!encodedPrefix.empty()) { |
| querySs << (marker.empty() ? "prefix=" : "&prefix=") << encodedPrefix; |
| } |
| |
| s3Url.setPrefix(""); |
| string queryStr = querySs.str(); |
| |
| Response resp = getBucketResponse(s3Url, queryStr); |
| |
| if (resp.getStatus() == RESPONSE_OK) { |
| xmlParserCtxtPtr xmlContext = getXMLContext(resp); |
| XMLContextHolder holder(xmlContext); |
| if (parseBucketXML(&result, xmlContext, marker)) { |
| continue; |
| } |
| } else if (resp.getStatus() == RESPONSE_ERROR) { |
| S3MessageParser s3msg(resp); |
| S3_DIE(S3LogicError, s3msg.getCode(), s3msg.getMessage()); |
| } else { |
| S3_DIE(S3RuntimeError, "unexpected response status"); |
| } |
| |
| return result; |
| } while (!marker.empty()); |
| |
| return result; |
| } |
| |
| uint64_t S3InterfaceService::fetchData(uint64_t offset, S3VectorUInt8 &data, uint64_t len, |
| const S3Url &s3Url) { |
| HTTPHeaders headers; |
| |
| char rangeBuf[S3_RANGE_HEADER_STRING_LEN] = {0}; |
| snprintf(rangeBuf, sizeof(rangeBuf), "bytes=%" PRIu64 "-%" PRIu64, offset, offset + len - 1); |
| headers.Add(HOST, s3Url.getHostForCurl()); |
| headers.Add(RANGE, rangeBuf); |
| headers.Add(X_AMZ_CONTENT_SHA256, |
| "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"); // sha256hex |
| // of empty |
| // string |
| |
| SignRequestV4("GET", &headers, s3Url.getRegion(), s3Url.getPathForCurl(), "", |
| this->params.getCred()); |
| |
| Response resp = this->getResponseWithRetries(s3Url.getFullUrlForCurl(), headers); |
| if (resp.getStatus() == RESPONSE_OK) { |
| data.swap(resp.getRawData()); |
| S3_CHECK_OR_DIE(data.size() == len, S3PartialResponseError, len, data.size()); |
| return data.size(); |
| } else if (resp.getStatus() == RESPONSE_ERROR) { |
| S3MessageParser s3msg(resp); |
| S3_DIE(S3LogicError, s3msg.getCode(), s3msg.getMessage()); |
| } else { |
| S3_DIE(S3RuntimeError, "unexpected response status"); |
| } |
| } |
| |
| S3CompressionType S3InterfaceService::checkCompressionType(const S3Url &s3Url) { |
| string ext = s3Url.getExtension(); |
| if (ext == ".deflate") { |
| return S3_COMPRESSION_DEFLATE; |
| } |
| |
| HTTPHeaders headers; |
| |
| char rangeBuf[S3_RANGE_HEADER_STRING_LEN] = {0}; |
| snprintf(rangeBuf, sizeof(rangeBuf), "bytes=%d-%d", 0, S3_MAGIC_BYTES_NUM - 1); |
| |
| headers.Add(HOST, s3Url.getHostForCurl()); |
| headers.Add(RANGE, rangeBuf); |
| headers.Add(X_AMZ_CONTENT_SHA256, |
| "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"); // sha256hex |
| // of empty |
| // string |
| |
| SignRequestV4("GET", &headers, s3Url.getRegion(), s3Url.getPathForCurl(), "", |
| this->params.getCred()); |
| |
| Response resp = this->getResponseWithRetries(s3Url.getFullUrlForCurl(), headers); |
| if (resp.getStatus() == RESPONSE_OK) { |
| S3VectorUInt8 &responseData = resp.getRawData(); |
| if (responseData.size() < S3_MAGIC_BYTES_NUM) { |
| return S3_COMPRESSION_PLAIN; |
| } |
| |
| S3_CHECK_OR_DIE(responseData.size() == S3_MAGIC_BYTES_NUM, S3PartialResponseError, |
| S3_MAGIC_BYTES_NUM, responseData.size()); |
| |
| if ((responseData[0] == 0x1f) && (responseData[1] == 0x8b)) { |
| return S3_COMPRESSION_GZIP; |
| } |
| } else if (resp.getStatus() == RESPONSE_ERROR) { |
| S3MessageParser s3msg(resp); |
| S3_DIE(S3LogicError, s3msg.getCode(), s3msg.getMessage()); |
| } else { |
| S3_DIE(S3RuntimeError, "unexpected response status"); |
| } |
| |
| return S3_COMPRESSION_PLAIN; |
| } |
| |
| bool S3InterfaceService::checkKeyExistence(const S3Url &s3Url) { |
| HTTPHeaders headers; |
| |
| headers.Add(HOST, s3Url.getHostForCurl()); |
| headers.Add(X_AMZ_CONTENT_SHA256, |
| "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"); // sha256hex |
| // of empty |
| // string |
| |
| SignRequestV4("HEAD", &headers, s3Url.getRegion(), s3Url.getPathForCurl(), "", |
| this->params.getCred()); |
| |
| return isKeyExisted(headResponseWithRetries(s3Url.getFullUrlForCurl(), headers)); |
| } |
| |
| string S3InterfaceService::getUploadId(const S3Url &s3Url) { |
| HTTPHeaders headers; |
| |
| headers.Add(HOST, s3Url.getHostForCurl()); |
| headers.Disable(CONTENTTYPE); |
| headers.Disable(CONTENTLENGTH); |
| headers.Add(X_AMZ_CONTENT_SHA256, |
| "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"); // sha256hex |
| // of empty |
| // string |
| if (params.getSSEType() == SSE_S3) { |
| headers.Add(X_AMZ_SERVER_SIDE_ENCRYPTION, "AES256"); |
| } |
| |
| SignRequestV4("POST", &headers, s3Url.getRegion(), s3Url.getPathForCurl(), |
| "uploads=", this->params.getCred()); |
| |
| stringstream urlWithQuery; |
| urlWithQuery << s3Url.getFullUrlForCurl() << "?uploads"; |
| |
| Response resp = this->postResponseWithRetries(urlWithQuery.str(), headers, vector<uint8_t>()); |
| S3MessageParser s3msg(resp); |
| |
| if (resp.getStatus() == RESPONSE_OK) { |
| return s3msg.parseS3Tag("UploadId"); |
| } else if (resp.getStatus() == RESPONSE_ERROR) { |
| S3MessageParser s3msg(resp); |
| S3_DIE(S3LogicError, s3msg.getCode(), s3msg.getMessage()); |
| } else { |
| S3_DIE(S3RuntimeError, "unexpected response status"); |
| } |
| } |
| |
| string S3InterfaceService::uploadPartOfData(S3VectorUInt8 &data, const S3Url &s3Url, |
| uint64_t partNumber, const string &uploadId) { |
| HTTPHeaders headers; |
| stringstream queryString; |
| |
| headers.Add(HOST, s3Url.getHostForCurl()); |
| |
| char contentSha256[SHA256_DIGEST_STRING_LENGTH]; // 65 |
| sha256_hex((const char *)data.data(), data.size(), contentSha256); |
| headers.Add(X_AMZ_CONTENT_SHA256, contentSha256); |
| |
| headers.Add(CONTENTTYPE, "text/plain"); |
| // headers.Add(CONTENTLENGTH, std::to_string((unsigned long long)data.size())); |
| |
| queryString << "partNumber=" << partNumber << "&uploadId=" << uploadId; |
| |
| SignRequestV4("PUT", &headers, s3Url.getRegion(), s3Url.getPathForCurl(), queryString.str(), |
| this->params.getCred()); |
| |
| stringstream urlWithQuery; |
| urlWithQuery << s3Url.getFullUrlForCurl() << "?partNumber=" << partNumber |
| << "&uploadId=" << uploadId; |
| |
| Response resp = this->putResponseWithRetries(urlWithQuery.str(), headers, data); |
| if (resp.getStatus() == RESPONSE_OK) { |
| string headers(resp.getRawHeaders().begin(), resp.getRawHeaders().end()); |
| string toSearch = "etag: "; |
| |
| auto res = |
| std::search(headers.begin(), headers.end(), toSearch.begin(), toSearch.end(), |
| [](char ch1, char ch2) { return std::tolower(ch1) == std::tolower(ch2); }); |
| |
| if (res == headers.end()) { |
| S3_DIE(S3RuntimeError, "Response does not contain etag in the header"); |
| } |
| |
| uint64_t etagStartPos = res - headers.begin() + toSearch.length(); |
| string etagToEnd = headers.substr(etagStartPos); |
| // RFC 2616 states "HTTP/1.1 defines the sequence CR LF as the end-of-line |
| // marker for all protocol elements except the entity-body" |
| uint64_t etagStrLen = etagToEnd.find("\r"); |
| |
| return etagToEnd.substr(0, etagStrLen); |
| } else if (resp.getStatus() == RESPONSE_ERROR) { |
| S3MessageParser s3msg(resp); |
| S3_DIE(S3LogicError, s3msg.getCode(), s3msg.getMessage()); |
| } else { |
| S3_DIE(S3RuntimeError, "unexpected response status"); |
| } |
| } |
| |
| bool S3InterfaceService::completeMultiPart(const S3Url &s3Url, const string &uploadId, |
| const vector<string> &etagArray) { |
| HTTPHeaders headers; |
| stringstream queryString; |
| |
| stringstream body; |
| |
| // check whether etagList or uploadId are empty, |
| // no matter we have done this in upper-layer or not. |
| if (etagArray.empty() || uploadId.empty()) { |
| return false; |
| } |
| |
| body << "<CompleteMultipartUpload>\n"; |
| for (uint64_t i = 0; i < etagArray.size(); ++i) { |
| body << " <Part>\n <PartNumber>" << i + 1 << "</PartNumber>\n <ETag>" << etagArray[i] |
| << "</ETag>\n </Part>\n"; |
| } |
| body << "</CompleteMultipartUpload>"; |
| |
| headers.Add(HOST, s3Url.getHostForCurl()); |
| headers.Add(CONTENTTYPE, "application/xml"); |
| |
| char contentSha256[SHA256_DIGEST_STRING_LENGTH]; // 65 |
| sha256_hex(body.str().c_str(), contentSha256); |
| headers.Add(X_AMZ_CONTENT_SHA256, contentSha256); |
| |
| headers.Add(CONTENTLENGTH, std::to_string((unsigned long long)body.str().length())); |
| |
| queryString << "uploadId=" << uploadId; |
| |
| SignRequestV4("POST", &headers, s3Url.getRegion(), s3Url.getPathForCurl(), queryString.str(), |
| this->params.getCred()); |
| |
| stringstream urlWithQuery; |
| urlWithQuery << s3Url.getFullUrlForCurl() << "?uploadId=" << uploadId; |
| |
| string bodyString = body.str(); |
| Response resp = this->postResponseWithRetries( |
| urlWithQuery.str(), headers, vector<uint8_t>(bodyString.begin(), bodyString.end())); |
| |
| if (resp.getStatus() == RESPONSE_OK) { |
| return true; |
| } else if (resp.getStatus() == RESPONSE_ERROR) { |
| S3MessageParser s3msg(resp); |
| S3_DIE(S3LogicError, s3msg.getCode(), s3msg.getMessage()); |
| } else { |
| S3_DIE(S3RuntimeError, "unexpected response status"); |
| } |
| } |
| |
| bool S3InterfaceService::abortUpload(const S3Url &s3Url, const string &uploadId) { |
| HTTPHeaders headers; |
| stringstream queryString; |
| |
| headers.Add(HOST, s3Url.getHostForCurl()); |
| headers.Disable(CONTENTTYPE); |
| headers.Disable(CONTENTLENGTH); |
| headers.Add(X_AMZ_CONTENT_SHA256, |
| "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"); // sha256hex |
| // of empty |
| // string |
| |
| queryString << "uploadId=" << uploadId; |
| |
| // DELETE /ObjectName?uploadId=UploadId HTTP/1.1 |
| SignRequestV4("DELETE", &headers, s3Url.getRegion(), s3Url.getPathForCurl(), queryString.str(), |
| this->params.getCred()); |
| |
| stringstream urlWithQuery; |
| urlWithQuery << s3Url.getFullUrlForCurl() << "?uploadId=" << uploadId; |
| |
| Response resp = this->deleteRequestWithRetries(urlWithQuery.str(), headers); |
| |
| if (resp.getStatus() == RESPONSE_OK) { |
| return true; |
| } else if (resp.getStatus() == RESPONSE_ERROR) { |
| S3MessageParser s3msg(resp); |
| S3_DIE(S3LogicError, s3msg.getCode(), s3msg.getMessage()); |
| } else { |
| S3_DIE(S3RuntimeError, "unexpected response status"); |
| } |
| } |