blob: c2dbe937b812b7847df44957c662e67a716b6056 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.IOException;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Test;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
private static final Logger LOG =
LoggerFactory.getLogger(ITestAbfsNetworkStatistics.class);
private static final int LARGE_OPERATIONS = 10;
public ITestAbfsNetworkStatistics() throws Exception {
}
/**
* Testing connections_made, send_request and bytes_send statistics in
* {@link AbfsRestOperation}.
*/
@Test
public void testAbfsHttpSendStatistics() throws IOException {
describe("Test to check correct values of statistics after Abfs http send "
+ "request is done.");
AzureBlobFileSystem fs = getFileSystem();
Map<String, Long> metricMap;
Path sendRequestPath = path(getMethodName());
String testNetworkStatsString = "http_send";
long connectionsMade, requestsSent, bytesSent;
metricMap = fs.getInstrumentationMap();
long connectionsMadeBeforeTest = metricMap
.get(CONNECTIONS_MADE.getStatName());
long requestsMadeBeforeTest = metricMap.get(SEND_REQUESTS.getStatName());
/*
* Creating AbfsOutputStream will result in 1 connection made and 1 send
* request.
*/
try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
sendRequestPath)) {
out.write(testNetworkStatsString.getBytes());
/*
* Flushes all outstanding data (i.e. the current unfinished packet)
* from the client into the service on all DataNode replicas.
*/
out.hflush();
metricMap = fs.getInstrumentationMap();
/*
* Testing the network stats with 1 write operation.
*
* connections_made : (connections made above) + 2(flush).
*
* send_requests : (requests sent above) + 2(flush).
*
* bytes_sent : bytes wrote in AbfsOutputStream.
*/
long extraCalls = 0;
if (!fs.getAbfsStore()
.isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
// no network calls are made for hflush in case of appendblob
extraCalls++;
}
long expectedConnectionsMade = connectionsMadeBeforeTest + extraCalls + 2;
long expectedRequestsSent = requestsMadeBeforeTest + extraCalls + 2;
connectionsMade = assertAbfsStatistics(CONNECTIONS_MADE,
expectedConnectionsMade, metricMap);
requestsSent = assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent,
metricMap);
bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
testNetworkStatsString.getBytes().length, metricMap);
}
// To close the AbfsOutputStream 1 connection is made and 1 request is sent.
connectionsMade++;
requestsSent++;
try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
sendRequestPath)) {
// Is a file overwrite case
long createRequestCalls = 1;
long createTriggeredGFSForETag = 0;
if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
createRequestCalls += 1;
createTriggeredGFSForETag = 1;
}
for (int i = 0; i < LARGE_OPERATIONS; i++) {
out.write(testNetworkStatsString.getBytes());
/*
* 1 flush call would create 2 connections and 2 send requests.
* when hflush() is called it will essentially trigger append() and
* flush() inside AbfsRestOperation. Both of which calls
* executeHttpOperation() method which creates a connection and sends
* requests.
*/
out.hflush();
}
metricMap = fs.getInstrumentationMap();
/*
* Testing the network stats with Large amount of bytes sent.
*
* connections made : connections_made(Last assertion) + 1
* (AbfsOutputStream) + LARGE_OPERATIONS * 2(flush).
*
* send requests : requests_sent(Last assertion) + 1(AbfsOutputStream) +
* LARGE_OPERATIONS * 2(flush).
*
* bytes sent : bytes_sent(Last assertion) + LARGE_OPERATIONS * (bytes
* wrote each time).
*
*/
connectionsMade += createRequestCalls + createTriggeredGFSForETag;
requestsSent += createRequestCalls;
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
// no network calls are made for hflush in case of appendblob
assertAbfsStatistics(CONNECTIONS_MADE,
connectionsMade + LARGE_OPERATIONS, metricMap);
assertAbfsStatistics(SEND_REQUESTS,
requestsSent + LARGE_OPERATIONS, metricMap);
} else {
assertAbfsStatistics(CONNECTIONS_MADE,
connectionsMade + LARGE_OPERATIONS * 2, metricMap);
assertAbfsStatistics(SEND_REQUESTS,
requestsSent + LARGE_OPERATIONS * 2, metricMap);
}
assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length),
metricMap);
}
}
/**
* Testing get_response and bytes_received in {@link AbfsRestOperation}.
*/
@Test
public void testAbfsHttpResponseStatistics() throws IOException {
describe("Test to check correct values of statistics after Http "
+ "Response is processed.");
AzureBlobFileSystem fs = getFileSystem();
Path getResponsePath = path(getMethodName());
Map<String, Long> metricMap;
String testResponseString = "some response";
long getResponses, bytesReceived;
FSDataOutputStream out = null;
FSDataInputStream in = null;
try {
/*
* Creating a File and writing some bytes in it.
*
* get_response : 3(getFileSystem) + 1(OutputStream creation) + 2
* (Writing data in Data store).
*
*/
out = fs.create(getResponsePath);
out.write(testResponseString.getBytes());
out.hflush();
metricMap = fs.getInstrumentationMap();
long getResponsesBeforeTest = metricMap
.get(CONNECTIONS_MADE.getStatName());
// open would require 1 get response.
in = fs.open(getResponsePath);
// read would require 1 get response and also get the bytes received.
int result = in.read();
// Confirming read isn't -1.
LOG.info("Result of read operation : {}", result);
metricMap = fs.getInstrumentationMap();
/*
* Testing values of statistics after writing and reading a buffer.
*
* get_responses - (above operations) + 1(open()) + 1 (read()).;
*
* bytes_received - This should be equal to bytes sent earlier.
*/
long extraCalls = 0;
if (!fs.getAbfsStore()
.isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) {
// no network calls are made for hflush in case of appendblob
extraCalls++;
}
long expectedGetResponses = getResponsesBeforeTest + extraCalls + 1;
getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
expectedGetResponses, metricMap);
// Testing that bytes received is equal to bytes sent.
long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName());
bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
bytesSend,
metricMap);
} finally {
IOUtils.cleanupWithLogger(LOG, out, in);
}
// To close the streams 1 response is received.
getResponses++;
try {
/*
* Creating a file and writing buffer into it.
* This is a file recreate, so it will trigger
* 2 extra calls if create overwrite is off by default.
* Also recording the buffer for future read() call.
* This creating outputStream and writing requires 2 *
* (LARGE_OPERATIONS) get requests.
*/
StringBuilder largeBuffer = new StringBuilder();
out = fs.create(getResponsePath);
long createRequestCalls = 1;
if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
createRequestCalls += 2;
}
for (int i = 0; i < LARGE_OPERATIONS; i++) {
out.write(testResponseString.getBytes());
out.hflush();
largeBuffer.append(testResponseString);
}
// Open requires 1 get_response.
in = fs.open(getResponsePath);
/*
* Reading the file which was written above. This read() call would
* read bytes equal to the bytes that was written above.
* Get response would be 1 only.
*/
in.read(0, largeBuffer.toString().getBytes(), 0,
largeBuffer.toString().getBytes().length);
metricMap = fs.getInstrumentationMap();
/*
* Testing the statistics values after writing and reading a large buffer.
*
* get_response : get_responses(Last assertion) + 1
* (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing
* LARGE_OPERATIONS times) + 1(open()) + 1(read()) +
* 1 (createOverwriteTriggeredGetForeTag).
*
* bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS *
* bytes wrote each time (bytes_received is equal to bytes wrote in the
* File).
*
*/
assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length),
metricMap);
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) {
// no network calls are made for hflush in case of appendblob
assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
getResponses + 3 + LARGE_OPERATIONS, metricMap);
} else {
assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
getResponses + 2 + createRequestCalls + 2 * LARGE_OPERATIONS,
metricMap);
}
} finally {
IOUtils.cleanupWithLogger(LOG, out, in);
}
}
/**
* Testing bytes_received counter value when a response failure occurs.
*/
@Test
public void testAbfsHttpResponseFailure() throws IOException {
describe("Test to check the values of bytes received counter when a "
+ "response is failed");
AzureBlobFileSystem fs = getFileSystem();
Path responseFailurePath = path(getMethodName());
Map<String, Long> metricMap;
FSDataOutputStream out = null;
try {
//create an empty file
out = fs.create(responseFailurePath);
//Re-creating the file again on same path with false overwrite, this
// would cause a response failure with status code 409.
out = fs.create(responseFailurePath, false);
} catch (FileAlreadyExistsException faee) {
metricMap = fs.getInstrumentationMap();
// Assert after catching the 409 error to check the counter values.
assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, 0, metricMap);
} finally {
IOUtils.cleanupWithLogger(LOG, out);
}
}
}