blob: 11c9ebb7ff6dc3b22a281167ff6d561a3413bf7b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.ipc;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.EOFException;
import java.net.Proxy;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.net.URL;
import java.net.HttpURLConnection;
/** An HTTP-based {@link Transceiver} implementation. */
public class HttpTransceiver extends Transceiver {
static final String CONTENT_TYPE = "avro/binary";
private URL url;
private Proxy proxy;
private HttpURLConnection connection;
private int timeout;
public HttpTransceiver(URL url) { this.url = url; }
public HttpTransceiver(URL url, Proxy proxy) {
this(url);
this.proxy = proxy;
}
/** Set the connect and read timeouts, in milliseconds. */
public void setTimeout(int timeout) { this.timeout = timeout; }
public String getRemoteName() { return this.url.toString(); }
public synchronized List<ByteBuffer> readBuffers() throws IOException {
InputStream in = connection.getInputStream();
try {
return readBuffers(in);
} finally {
in.close();
}
}
public synchronized void writeBuffers(List<ByteBuffer> buffers)
throws IOException {
if (proxy == null)
connection = (HttpURLConnection)url.openConnection();
else
connection = (HttpURLConnection)url.openConnection(proxy);
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", CONTENT_TYPE);
connection.setRequestProperty("Content-Length",
Integer.toString(getLength(buffers)));
connection.setDoOutput(true);
connection.setReadTimeout(timeout);
connection.setConnectTimeout(timeout);
OutputStream out = connection.getOutputStream();
try {
writeBuffers(buffers, out);
} finally {
out.close();
}
}
static int getLength(List<ByteBuffer> buffers) {
int length = 0;
for (ByteBuffer buffer : buffers) {
length += 4;
length += buffer.remaining();
}
length += 4;
return length;
}
static List<ByteBuffer> readBuffers(InputStream in)
throws IOException {
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
while (true) {
int length = (in.read()<<24)+(in.read()<<16)+(in.read()<<8)+in.read();
if (length == 0) { // end of buffers
return buffers;
}
ByteBuffer buffer = ByteBuffer.allocate(length);
while (buffer.hasRemaining()) {
int p = buffer.position();
int i = in.read(buffer.array(), p, buffer.remaining());
if (i < 0)
throw new EOFException("Unexpected EOF");
buffer.position(p+i);
}
buffer.flip();
buffers.add(buffer);
}
}
static void writeBuffers(List<ByteBuffer> buffers, OutputStream out)
throws IOException {
for (ByteBuffer buffer : buffers) {
writeLength(buffer.limit(), out); // length-prefix
out.write(buffer.array(), buffer.position(), buffer.remaining());
buffer.position(buffer.limit());
}
writeLength(0, out); // null-terminate
}
private static void writeLength(int length, OutputStream out)
throws IOException {
out.write(0xff & (length >>> 24));
out.write(0xff & (length >>> 16));
out.write(0xff & (length >>> 8));
out.write(0xff & length);
}
}