blob: e8bd3fc9a535d221fe43b3d27efef820620deb3a [file] [log] [blame]
/*
* RED5 Open Source Flash Server - http://code.google.com/p/red5/
*
* Copyright 2006-2012 by respective authors (see below). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.red5.client.net.rtmpt;
import java.io.IOException;
import java.util.List;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.HttpVersion;
import org.apache.http.ParseException;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.util.EntityUtils;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.DummySession;
import org.apache.mina.core.session.IoSession;
import org.red5.client.net.rtmp.RTMPClientConnManager;
import org.red5.server.net.protocol.ProtocolState;
import org.red5.server.net.rtmp.RTMPConnection;
import org.red5.server.net.rtmp.codec.RTMP;
import org.red5.server.net.rtmp.message.Constants;
import org.red5.server.util.HttpConnectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Client connector for RTMPT
*
* @author Anton Lebedevich (mabrek@gmail.com)
* @author Paul Gregoire (mondain@gmail.com)
*/
class RTMPTClientConnector extends Thread {
private static final Logger log = LoggerFactory.getLogger(RTMPTClientConnector.class);
private static final String CONTENT_TYPE = "application/x-fcs";
private static final ByteArrayEntity ZERO_REQUEST_ENTITY = new ByteArrayEntity(new byte[] { 0 });
/**
* Size to split messages queue by, borrowed from
* RTMPTServlet.RESPONSE_TARGET_SIZE
*/
private static final int SEND_TARGET_SIZE = 32768;
private final DefaultHttpClient httpClient = HttpConnectionUtil.getClient();
private final HttpHost targetHost;
private final RTMPTClient client;
private final RTMPClientConnManager connManager;
private int clientId;
private long messageCount = 1;
private volatile boolean stopRequested = false;
public RTMPTClientConnector(String server, int port, RTMPTClient client) {
targetHost = new HttpHost(server, port, "http");
httpClient.getParams().setParameter(CoreProtocolPNames.PROTOCOL_VERSION, HttpVersion.HTTP_1_1);
this.client = client;
this.connManager = RTMPClientConnManager.getInstance();
}
public void run() {
HttpPost post = null;
try {
RTMPTClientConnection connection = openConnection();
while (!connection.isClosing() && !stopRequested) {
IoBuffer toSend = connection.getPendingMessages(SEND_TARGET_SIZE);
int limit = toSend != null ? toSend.limit() : 0;
if (limit > 0) {
post = makePost("send");
post.setEntity(new InputStreamEntity(toSend.asInputStream(), limit));
post.addHeader("Content-Type", CONTENT_TYPE);
} else {
post = makePost("idle");
post.setEntity(ZERO_REQUEST_ENTITY);
}
// execute
HttpResponse response = httpClient.execute(targetHost, post);
// check for error
checkResponseCode(response);
// handle data
byte[] received = EntityUtils.toByteArray(response.getEntity());
IoBuffer data = IoBuffer.wrap(received);
if (data.limit() > 0) {
data.skip(1); // XXX: polling interval lies in this byte
}
List<?> messages = connection.decode(data);
if (messages == null || messages.isEmpty()) {
try {
// XXX handle polling delay
Thread.sleep(250);
} catch (InterruptedException e) {
if (stopRequested) {
post.abort();
break;
}
}
continue;
}
IoSession session = new DummySession();
session.setAttribute(RTMPConnection.RTMP_CONNECTION_KEY, connection);
session.setAttribute(ProtocolState.SESSION_KEY, connection.getState());
for (Object message : messages) {
try {
client.messageReceived(message, session);
} catch (Exception e) {
log.error("Could not process message.", e);
}
}
}
finalizeConnection();
client.connectionClosed(connection, connection.getState());
} catch (Throwable e) {
log.debug("RTMPT handling exception", e);
client.handleException(e);
if (post != null) {
post.abort();
}
}
}
private RTMPTClientConnection openConnection() throws IOException {
RTMPTClientConnection connection = null;
HttpPost openPost = new HttpPost("/open/1");
setCommonHeaders(openPost);
openPost.setEntity(ZERO_REQUEST_ENTITY);
// execute
HttpResponse response = httpClient.execute(targetHost, openPost);
checkResponseCode(response);
// get the response entity
HttpEntity entity = response.getEntity();
if (entity != null) {
String responseStr = EntityUtils.toString(entity);
clientId = Integer.parseInt(responseStr.substring(0, responseStr.length() - 1));
log.debug("Got client id {}", clientId);
// create a new connection
connection = (RTMPTClientConnection) connManager.createConnection(RTMPTClientConnection.class);
// client state
RTMP state = new RTMP();
connection.setState(state);
connection.setHandler(client);
connection.setDecoder(client.getDecoder());
connection.setEncoder(client.getEncoder());
log.debug("Handshake 1st phase");
IoBuffer handshake = IoBuffer.allocate(Constants.HANDSHAKE_SIZE + 1);
handshake.put((byte) 0x03);
handshake.fill((byte) 0x01, Constants.HANDSHAKE_SIZE);
handshake.flip();
connection.writeRaw(handshake);
}
return connection;
}
private void finalizeConnection() throws IOException {
log.debug("Sending close post");
HttpPost closePost = new HttpPost(makeUrl("close"));
closePost.setEntity(ZERO_REQUEST_ENTITY);
HttpResponse response = httpClient.execute(targetHost, closePost);
EntityUtils.consume(response.getEntity());
}
private HttpPost makePost(String command) {
HttpPost post = new HttpPost(makeUrl(command));
setCommonHeaders(post);
return post;
}
private String makeUrl(String command) {
// use message count from connection
return String.format("/%s/%s/%s", command, clientId, messageCount++);
}
private void setCommonHeaders(HttpPost post) {
post.addHeader("Connection", "Keep-Alive");
post.addHeader("Cache-Control", "no-cache");
}
private void checkResponseCode(HttpResponse response) throws ParseException, IOException {
int code = response.getStatusLine().getStatusCode();
if (code != HttpStatus.SC_OK) {
throw new RuntimeException("Bad HTTP status returned, line: " + response.getStatusLine() + "; body: " + EntityUtils.toString(response.getEntity()));
}
}
public void setStopRequested(boolean stopRequested) {
this.stopRequested = stopRequested;
}
}