blob: ac748d50f9e684119af9d99fcd78cc2218bd2667 [file] [log] [blame]
package org.apache.hadoop.chukwa.pig;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.Calendar;
import java.util.Map;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.chukwa.extraction.engine.Record;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.pig.StoreFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
public class ChukwaStorer extends StoreFunc {
RecordWriter<ChukwaRecordKey, ChukwaRecord> writer;
Calendar calendar = Calendar.getInstance();
int timestampFieldIndex = -1;
int pkFieldIndex = -1;
int sourceFieldIndex = -1;
int clusterNameFieldIndex = -1;
int recordTypeFieldIndex = -1;
int applicationFieldIndex = -1;
String[] fields = null;
public ChukwaStorer() {
}
public ChukwaStorer(String... scfields ) {
this.fields = scfields;
for (int i=0;i< scfields.length;i++) {
if (scfields[i].equalsIgnoreCase("c_timestamp")) {
timestampFieldIndex = i;
} else if (scfields[i].equalsIgnoreCase("c_pk")) {
pkFieldIndex = i;
} else if (scfields[i].equalsIgnoreCase("c_source")) {
sourceFieldIndex = i;
} else if (scfields[i].equalsIgnoreCase("c_recordtype")) {
recordTypeFieldIndex =i;
} else if (scfields[i].equalsIgnoreCase("c_application")) {
applicationFieldIndex =i;
} else if (scfields[i].equalsIgnoreCase("c_cluster")) {
clusterNameFieldIndex =i;
}
}
}
@Override
public void putNext(Tuple f) throws IOException {
long timePartition = 0l;
long timestamp = 0L;
String source = "N/A";
String application = "N/A";
String recordType = "N/A";
String clusterName = "N/A";
String pk = "";
try {
ChukwaRecordKey key = new ChukwaRecordKey();
ChukwaRecord record = new ChukwaRecord();
record.setTime(System.currentTimeMillis());
int inputSize = f.size();
for(int i=0;i<inputSize;i++) {
Object field = f.get(i);
if (field == null) {
continue;
}
if (i == this.pkFieldIndex) {
pk = field.toString();
continue;
} else if ( i == this.sourceFieldIndex) {
source = field.toString();
continue;
}else if ( i== this.recordTypeFieldIndex) {
recordType = field.toString();
continue;
}else if ( i== this.applicationFieldIndex) {
application = field.toString();
continue;
} else if ( i== this.clusterNameFieldIndex) {
clusterName = field.toString();
continue;
}else if (i == this.timestampFieldIndex) {
timestamp = Long.parseLong(field.toString());
record.setTime(timestamp);
synchronized (calendar)
{
calendar.setTimeInMillis(timestamp);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
timePartition = calendar.getTimeInMillis();
}
record.setTime(Long.parseLong(field.toString()));
continue;
} else if (field instanceof Map) {
Map<Object, Object> m = (Map<Object, Object>)field;
for(Object o: m.keySet()) {
record.add(o.toString(),m.get(o).toString());
}
continue;
} else {
if (i <fields.length ) {
record.add(fields[i],field.toString());
} else {
record.add("field-"+i,field.toString());
}
continue;
}
}
record.add(Record.tagsField, " cluster=\"" + clusterName.trim() + "\" ");
record.add(Record.sourceField, source);
record.add(Record.applicationField, application);
key.setKey("" + timePartition + "/" + pk + "/" + timestamp);
key.setReduceType(recordType);
writer.write(key, record);
} catch (ExecException e) {
IOException ioe = new IOException();
ioe.initCause(e);
throw ioe;
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@Override
public OutputFormat getOutputFormat() throws IOException {
return new SequenceFileOutputFormat<ChukwaRecordKey, ChukwaRecord>();
}
@SuppressWarnings("unchecked")
@Override
public void prepareToWrite(RecordWriter writer) throws IOException {
this.writer = writer;
}
@Override
public void setStoreLocation(String location, Job job) throws IOException {
FileOutputFormat.setOutputPath(job, new Path(location));
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
job.setOutputKeyClass(ChukwaRecordKey.class);
job.setOutputValueClass(ChukwaRecord.class);
}
}