blob: f1f11a17be47705d30619e2bb194ffcfca8d738d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.pipes;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import javax.crypto.SecretKey;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
public class CommonStub {
protected Socket socket = null;
protected DataInputStream dataInput;
protected DataOutputStream dataOut;
protected String createDigest(byte[] password, String data) throws IOException {
SecretKey key = JobTokenSecretManager.createSecretKey(password);
return SecureShuffleUtils.hashFromString(data, key);
}
protected void readObject(Writable obj, DataInputStream inStream) throws IOException {
int numBytes = WritableUtils.readVInt(inStream);
byte[] buffer;
// For BytesWritable and Text, use the specified length to set the length
// this causes the "obvious" translations to work. So that if you emit
// a string "abc" from C++, it shows up as "abc".
if (obj instanceof BytesWritable) {
buffer = new byte[numBytes];
inStream.readFully(buffer);
((BytesWritable) obj).set(buffer, 0, numBytes);
} else if (obj instanceof Text) {
buffer = new byte[numBytes];
inStream.readFully(buffer);
((Text) obj).set(buffer);
} else {
obj.readFields(inStream);
}
}
protected void writeObject(Writable obj, DataOutputStream stream)
throws IOException {
// For Text and BytesWritable, encode them directly, so that they end up
// in C++ as the natural translations.
DataOutputBuffer buffer = new DataOutputBuffer();
if (obj instanceof Text) {
Text t = (Text) obj;
int len = t.getLength();
WritableUtils.writeVLong(stream, len);
stream.flush();
stream.write(t.getBytes(), 0, len);
stream.flush();
} else if (obj instanceof BytesWritable) {
BytesWritable b = (BytesWritable) obj;
int len = b.getLength();
WritableUtils.writeVLong(stream, len);
stream.write(b.getBytes(), 0, len);
} else {
buffer.reset();
obj.write(buffer);
int length = buffer.getLength();
WritableUtils.writeVInt(stream, length);
stream.write(buffer.getData(), 0, length);
}
stream.flush();
}
protected void initSoket() throws Exception {
int port = Integer.parseInt(System.getenv("mapreduce.pipes.command.port"));
java.net.InetAddress address = java.net.InetAddress.getLocalHost();
socket = new Socket(address.getHostName(), port);
InputStream input = socket.getInputStream();
OutputStream output = socket.getOutputStream();
// try to read
dataInput = new DataInputStream(input);
WritableUtils.readVInt(dataInput);
String str = Text.readString(dataInput);
Text.readString(dataInput);
dataOut = new DataOutputStream(output);
WritableUtils.writeVInt(dataOut, 57);
String s = createDigest("password".getBytes(), str);
Text.writeString(dataOut, s);
// start
WritableUtils.readVInt(dataInput);
int cuttentAnswer = WritableUtils.readVInt(dataInput);
System.out.println("CURRENT_PROTOCOL_VERSION:" + cuttentAnswer);
// get configuration
// should be MessageType.SET_JOB_CONF.code
WritableUtils.readVInt(dataInput);
// array length
int j = WritableUtils.readVInt(dataInput);
for (int i = 0; i < j; i++) {
Text.readString(dataInput);
i++;
Text.readString(dataInput);
}
}
protected void closeSoket() {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}