blob: 069f957b04fb0e55644f21673ffb2e1980ac9f23 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.netty.example
import java.util
import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit}
import com.alibaba.fastjson.JSONObject
import org.apache.http.NameValuePair
import org.apache.http.client.entity.UrlEncodedFormEntity
import org.apache.http.client.methods.{HttpGet, HttpPost}
import org.apache.http.impl.client.HttpClients
import org.apache.http.message.BasicNameValuePair
import org.apache.http.util.EntityUtils
import org.slf4j.LoggerFactory
/** base test util */
class BaseTest {
lazy val logger = LoggerFactory.getLogger(getClass)
private lazy val httpclient = HttpClients.createDefault()
private lazy val queue = new LinkedBlockingQueue[JSONObject]()
private lazy val schedule = Executors.newScheduledThreadPool(20)
private lazy val pool = Executors.newCachedThreadPool()
def schedule(period: Int, f: () => Unit): Unit = {
schedule.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
f.apply()
}
}, 3, period, TimeUnit.SECONDS)
}
def run(f: () => Unit): Unit = {
pool.submit(new Runnable {
override def run(): Unit = {
f.apply()
}
})
}
def sendGetRequest(url: String): String = {
val httpGet = new HttpGet(url)
val response1 = httpclient.execute(httpGet)
try {
logger.info(s"response: ${response1.getStatusLine}, url:$url")
val entity = response1.getEntity
EntityUtils.toString(entity)
} finally {
response1.close()
}
}
def sendPostRequest(url: String, map: Map[String, String]): String = {
val httpPost = new HttpPost(url)
val nvps = new util.ArrayList[NameValuePair]()
map.foreach { kv =>
nvps.add(new BasicNameValuePair(kv._1, kv._2))
}
httpPost.setEntity(new UrlEncodedFormEntity(nvps))
val response = httpclient.execute(httpPost)
try {
logger.info("response status line:" + response.getStatusLine)
val entity2 = response.getEntity
EntityUtils.toString(entity2)
} finally {
response.close()
}
}
}