blob: e97138d88b0f4c3b69f259dfebaff2120fb92c85 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.datacollection.writer;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.Chunk;
import java.util.ArrayList;
import org.apache.hadoop.chukwa.datacollection.collector.CaptureWriter;
import org.apache.hadoop.io.IOUtils;
import java.net.*;
import java.io.*;
public class TestSocketTee extends TestCase{
public void testSocketTee() throws Exception {
Configuration conf = new Configuration();
conf.set("chukwa.pipeline",
SocketTeeWriter.class.getCanonicalName()+","// note comma
+ CaptureWriter.class.getCanonicalName());
conf.set("chukwa.writerClass",
PipelineStageWriter.class.getCanonicalName());
PipelineStageWriter psw = new PipelineStageWriter(conf);
System.out.println("pipeline established; now pushing a chunk");
ArrayList<Chunk> l = new ArrayList<Chunk>();
l.add(new ChunkImpl("dt", "name", 1, new byte[] {'a'}, null));
psw.add(l);
//push a chunk through. It should get written, but the socket tee shouldn't do anything.
assertEquals(1, CaptureWriter.outputs.size());
//now connect and set up a filter.
System.out.println("connecting to localhost");
Socket s = new Socket("localhost", SocketTeeWriter.DEFAULT_PORT);
// s.setSoTimeout(2000);
DataOutputStream dos = new DataOutputStream (s.getOutputStream());
dos.write((SocketTeeWriter.WRITABLE + " datatype=dt3\n").getBytes());
DataInputStream dis = new DataInputStream(s.getInputStream());
System.out.println("command send");
dis.readFully(new byte[3]);
//push a chunk not matching filter -- nothing should happen.
l = new ArrayList<Chunk>();
l.add(new ChunkImpl("dt2", "name", 1, new byte[] {'b'}, null));
psw.add(l);
assertEquals(2, CaptureWriter.outputs.size());
System.out.println("sent nonmatching chunk");
//and now one that does match -- data should be available to read off the socket
l = new ArrayList<Chunk>();
l.add(new ChunkImpl("dt3", "name", 1, new byte[] {'c'}, null));
psw.add(l);
assertEquals(3, CaptureWriter.outputs.size());
System.out.println("sent matching chunk");
System.out.println("reading...");
ChunkImpl chunk = ChunkImpl.read(dis);
assertTrue(chunk.getDataType().equals("dt3"));
System.out.println(chunk);
dis.close();
dos.close();
s.close();
Socket s2 = new Socket("localhost", SocketTeeWriter.DEFAULT_PORT);
s2.getOutputStream().write((SocketTeeWriter.RAW+" content=.*d.*\n").getBytes());
dis = new DataInputStream(s2.getInputStream());
dis.readFully(new byte[3]); //read "OK\n"
l = new ArrayList<Chunk>();
l.add(new ChunkImpl("dt3", "name", 1, new byte[] {'d'}, null));
psw.add(l);
assertEquals(4, CaptureWriter.outputs.size());
int len = dis.readInt();
assertTrue(len == 1);
byte[] data = new byte[100];
int read = dis.read(data);
assertTrue(read == 1);
assertTrue(data[0] == 'd');
s2.close();
dis.close();
l = new ArrayList<Chunk>();
l.add(new ChunkImpl("dt3", "name", 3, new byte[] {'c', 'a', 'd'}, null));
psw.add(l);
assertEquals(5, CaptureWriter.outputs.size());
Socket s3 = new Socket("localhost", SocketTeeWriter.DEFAULT_PORT);
s3.getOutputStream().write((SocketTeeWriter.ASCII_HEADER+" all\n").getBytes());
dis = new DataInputStream(s3.getInputStream());
dis.readFully(new byte[3]); //read "OK\n"
l = new ArrayList<Chunk>();
chunk= new ChunkImpl("dataTypeFoo", "streamName", 4, new byte[] {'t','e','x','t'}, null);
chunk.setSource("hostNameFoo");
l.add(chunk);
psw.add(l);
assertEquals(6, CaptureWriter.outputs.size());
len = dis.readInt();
data = new byte[len];
IOUtils.readFully(dis, data, 0, len);
String rcvd = new String(data);
System.out.println("got " + read+"/" +len +" bytes: " + rcvd);
assertEquals("hostNameFoo dataTypeFoo streamName 4\ntext", rcvd);
s3.close();
dis.close();
}
}