blob: f8351fa6b542c551ce79c82557d78ebe3162a6e3 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.connectors.http;
import java.nio.charset.StandardCharsets;
import org.apache.edgent.connectors.http.runtime.HttpRequester;
import org.apache.edgent.function.BiFunction;
import org.apache.edgent.function.Function;
import org.apache.edgent.function.Supplier;
import org.apache.edgent.function.UnaryOperator;
import org.apache.edgent.topology.TStream;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import com.google.gson.JsonObject;
/**
* HTTP streams.
*
*/
public class HttpStreams {
/**
* Make an HTTP GET request with JsonObject. <br>
*
* Method specifically works with JsonObjects. For each JsonObject in the stream,
* HTTP GET request is executed on provided uri. As a result, Response is added to
* the response TStream.
* <br>
*
* Sample usage:<br>
*
* <pre>
* {@code
* DirectProvider ep = new DirectProvider();
* Topology org.apache.edgent.org.apache.edgent.topology = ep.newTopology();
* final String url = "http://httpbin.org/get?";
*
* JsonObject request1 = new JsonObject();
* request1.addProperty("a", "abc");
* request1.addProperty("b", "42");
*
* TStream<JsonObject> stream = org.apache.edgent.org.apache.edgent.topology.collection(Arrays.asList(request1));
* TStream<JsonObject> rc = HttpStreams.getJson(stream,
* HttpClients::noAuthentication,
* t -> url + "a=" + t.get("a").getAsString() + "&b="
* + t.get("b").getAsString());
* }
* </pre>
*
* <br>
* See <i>HttpTest</i> for example. <br>
*
* @param stream - JsonObject TStream.
* @param clientCreator - CloseableHttpClient supplier preferably created using {@link HttpClients}
* @param uri - URI function which returns URI string
* @return TStream of JsonObject which contains responses of GET requests
*
* @see HttpStreams#requests(TStream, Supplier, Function, Function, BiFunction)
*/
public static TStream<JsonObject> getJson(TStream<JsonObject> stream,
Supplier<CloseableHttpClient> clientCreator,
Function<JsonObject,String> uri) {
return HttpStreams.<JsonObject,JsonObject>requests(stream, clientCreator,
t -> HttpGet.METHOD_NAME, uri, HttpResponders.json());
}
/**
* Make an HTTP DELETE request with JsonObject. <br>
*
* Method specifically works with JsonObjects. For each JsonObject in the
* stream, HTTP DELETE request is executed on provided uri. As a result,
* Response is added to the response TStream. <br>
*
* Sample usage:<br>
*
* <pre>
* {@code
* DirectProvider ep = new DirectProvider();
* Topology org.apache.edgent.org.apache.edgent.topology = ep.newTopology();
* final String url = "http://httpbin.org/delete?";
*
* JsonObject request = new JsonObject();
* request.addProperty("a", "abc");
* request.addProperty("b", "42");
*
* TStream<JsonObject> stream = org.apache.edgent.org.apache.edgent.topology.collection(Arrays.asList(request));
* TStream<JsonObject> rc = HttpStreams.deleteJson(stream,
* HttpClients::noAuthentication,
* t -> url + "a=" + t.get("a").getAsString() + "&b="
* + t.get("b").getAsString());
* }
* </pre>
*
* <br>
* See <i>HttpTest</i> for example. <br>
*
* @param stream - JsonObject TStream.
* @param clientCreator - CloseableHttpClient supplier preferably created using {@link HttpClients}
* @param uri - URI function which returns URI string
* @return TStream of JsonObject which contains responses of DELETE requests
*
* @see HttpStreams#requests(TStream, Supplier, Function, Function, BiFunction)
*/
public static TStream<JsonObject> deleteJson(TStream<JsonObject> stream,
Supplier<CloseableHttpClient> clientCreator,
Function<JsonObject,String> uri) {
return HttpStreams.<JsonObject,JsonObject>requests(stream, clientCreator,
t -> HttpDelete.METHOD_NAME, uri, HttpResponders.json());
}
/**
* Make an HTTP POST request with JsonObject. <br>
*
* Method specifically works with JsonObjects. For each JsonObject in the stream,
* HTTP POST request is executed on provided uri. Request body is filled using
* HttpEntity provided by body function. As a result, Response is added to
* the response TStream.<br>
*
* Sample usage:<br>
*
* <pre>
* {@code
* DirectProvider ep = new DirectProvider();
* Topology org.apache.edgent.org.apache.edgent.topology = ep.newTopology();
* final String url = "http://httpbin.org/post";
*
* JsonObject body = new JsonObject();
* body.addProperty("foo", "abc");
* body.addProperty("bar", 42);
*
* TStream<JsonObject> stream = org.apache.edgent.org.apache.edgent.topology.collection(Arrays.asList(body));
* TStream<JsonObject> rc = HttpStreams.postJson(stream,
* HttpClients::noAuthentication, t -> url, t -> t);
* }
* </pre>
*
* <br>
* See HttpTest for example. <br>
*
* @param stream - JsonObject TStream.
* @param clientCreator - CloseableHttpClient supplier preferably created using {@link HttpClients}
* @param uri - URI function which returns URI string
* @param body - Function that returns JsonObject which will be set as a body for the request.
* @return TStream of JsonObject which contains responses of POST requests
*
* @see HttpStreams#requestsWithBody(TStream, Supplier, Function, Function, Function, BiFunction)
*/
public static TStream<JsonObject> postJson(TStream<JsonObject> stream,
Supplier<CloseableHttpClient> clientCreator,
Function<JsonObject, String> uri,
UnaryOperator<JsonObject> body) {
return HttpStreams.<JsonObject, JsonObject> requestsWithBody(stream,
clientCreator, t -> HttpPost.METHOD_NAME, uri,
t -> new ByteArrayEntity(body.apply(t).toString().getBytes(StandardCharsets.UTF_8)),
HttpResponders.json());
}
/**
* Make an HTTP PUT request with JsonObject. <br>
*
* Method specifically works with JsonObjects. For each JsonObject in the
* stream, HTTP PUT request is executed on provided uri. Request body is
* filled using HttpEntity provided by body function. As a result, Response
* is added to the response TStream.<br>
*
* Sample usage:<br>
*
* <pre>
* {@code
* DirectProvider ep = new DirectProvider();
* Topology org.apache.edgent.org.apache.edgent.topology = ep.newTopology();
* final String url = "http://httpbin.org/put";
*
* JsonObject body = new JsonObject();
* body.addProperty("foo", "abc");
* body.addProperty("bar", 42);
*
* TStream<JsonObject> stream = org.apache.edgent.org.apache.edgent.topology.collection(Arrays.asList(body));
* TStream<JsonObject> rc = HttpStreams.putJson(stream,
* HttpClients::noAuthentication, t -> url, t -> t);
* }
* </pre>
*
* <br>
* See HttpTest for example. <br>
*
* @param stream - JsonObject TStream.
* @param clientCreator - CloseableHttpClient supplier preferably created using {@link HttpClients}
* @param uri - URI function which returns URI string
* @param body - Function that returns JsonObject which will be set as a body for the request.
* @return TStream of JsonObject which contains responses of PUT requests
*
* @see HttpStreams#requestsWithBody(TStream, Supplier, Function, Function, Function, BiFunction)
*/
public static TStream<JsonObject> putJson(TStream<JsonObject> stream,
Supplier<CloseableHttpClient> clientCreator,
Function<JsonObject, String> uri,
UnaryOperator<JsonObject> body) {
return HttpStreams.<JsonObject, JsonObject> requestsWithBody(stream,
clientCreator, t -> HttpPut.METHOD_NAME, uri,
t -> new ByteArrayEntity(body.apply(t).toString().getBytes(StandardCharsets.UTF_8)),
HttpResponders.json());
}
/**
* Make an HTTP request for each tuple on a stream.
* <UL>
* <LI>{@code clientCreator} is invoked once to create a new HTTP client
* to make the requests.
* </LI>
* <LI>
* {@code method} is invoked for each tuple to define the method
* to be used for the HTTP request driven by the tuple. A fixed method
* can be declared using a function such as:
* <UL style="list-style-type:none"><LI>{@code t -> HttpGet.METHOD_NAME}</LI></UL>
* </LI>
* <LI>
* {@code uri} is invoked for each tuple to define the URI
* to be used for the HTTP request driven by the tuple. A fixed method
* can be declared using a function such as:
* <UL style="list-style-type:none"><LI>{@code t -> "http://www.example.com"}</LI></UL>
* </LI>
* <LI>
* {@code response} is invoked after each request that did not throw an exception.
* It is passed the input tuple and the HTTP response. The function must completely
* consume the entity stream for the response. The return value is present on
* the stream returned by this method if it is non-null. A null return results
* in no tuple on the returned stream.
*
* </LI>
* </UL>
*
* @param <T> Tuple type for input stream
* @param <R> Tuple type for output stream
* @param stream Stream to invoke HTTP requests.
* @param clientCreator Function to create a HTTP client.
* @param method Function to define the HTTP method.
* @param uri Function to define the URI.
* @param response Function to process the response.
* @return Stream containing HTTP responses processed by the {@code response} function.
*
* @see HttpClients
* @see HttpResponders
*/
public static <T,R> TStream<R> requests(TStream<T> stream,
Supplier<CloseableHttpClient> clientCreator,
Function<T,String> method,
Function<T,String> uri,
BiFunction<T,CloseableHttpResponse,R> response) {
return stream.map(new HttpRequester<T,R>(clientCreator, method, uri, response));
}
/**
* Make an HTTP request with body for each tuple.<br>
*
* @param <T> Tuple type for input stream
* @param <R> Tuple type for output stream
* @param stream Stream to invoke HTTP requests.
* @param clientCreator Function to create a HTTP client.
* @param method Function to define the HTTP method.
* @param uri Function to define the URI.
* @param body Function to define the HTTP request body
* @param response Function to process the response.
* @return Stream containing HTTP responses processed by the {@code response} function.
*
* @see HttpStreams#requests(TStream, Supplier, Function, Function, BiFunction)
* @see HttpClients
* @see HttpResponders
*
*/
public static <T, R> TStream<R> requestsWithBody(TStream<T> stream,
Supplier<CloseableHttpClient> clientCreator,
Function<T, String> method,
Function<T, String> uri,
Function<T, HttpEntity> body,
BiFunction<T, CloseableHttpResponse, R> response) {
return stream.map(new HttpRequester<T, R>(clientCreator, method, uri, body, response));
}
}