| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.htrace.core; |
| |
| import com.fasterxml.jackson.core.JsonProcessingException; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.databind.ObjectWriter; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| |
| import java.io.BufferedReader; |
| import java.io.BufferedWriter; |
| import java.io.EOFException; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileOutputStream; |
| import java.io.FileWriter; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.FileChannel; |
| import java.nio.file.FileSystems; |
| import java.nio.file.StandardOpenOption; |
| import java.util.UUID; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| /** |
| * Writes the spans it receives to a local file. |
| */ |
| public class LocalFileSpanReceiver extends SpanReceiver { |
| private static final Log LOG = LogFactory.getLog(LocalFileSpanReceiver.class); |
| public static final String PATH_KEY = "local.file.span.receiver.path"; |
| public static final String CAPACITY_KEY = "local.file.span.receiver.capacity"; |
| public static final int CAPACITY_DEFAULT = 5000; |
| private static ObjectWriter JSON_WRITER = new ObjectMapper().writer(); |
| private final String path; |
| |
| private byte[][] bufferedSpans; |
| private int bufferedSpansIndex; |
| private final ReentrantLock bufferLock = new ReentrantLock(); |
| |
| private final FileOutputStream stream; |
| private final FileChannel channel; |
| private final ReentrantLock channelLock = new ReentrantLock(); |
| |
| public LocalFileSpanReceiver(HTraceConfiguration conf) { |
| int capacity = conf.getInt(CAPACITY_KEY, CAPACITY_DEFAULT); |
| if (capacity < 1) { |
| throw new IllegalArgumentException(CAPACITY_KEY + " must not be " + |
| "less than 1."); |
| } |
| String pathStr = conf.get(PATH_KEY); |
| if (pathStr == null || pathStr.isEmpty()) { |
| path = getUniqueLocalTraceFileName(); |
| } else { |
| path = pathStr; |
| } |
| boolean success = false; |
| try { |
| this.stream = new FileOutputStream(path, true); |
| } catch (IOException ioe) { |
| LOG.error("Error opening " + path + ": " + ioe.getMessage()); |
| throw new RuntimeException(ioe); |
| } |
| this.channel = stream.getChannel(); |
| if (this.channel == null) { |
| try { |
| this.stream.close(); |
| } catch (IOException e) { |
| LOG.error("Error closing " + path, e); |
| } |
| LOG.error("Failed to get channel for " + path); |
| throw new RuntimeException("Failed to get channel for " + path); |
| } |
| this.bufferedSpans = new byte[capacity][]; |
| this.bufferedSpansIndex = 0; |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Created new LocalFileSpanReceiver with path = " + path + |
| ", capacity = " + capacity); |
| } |
| } |
| |
| /** |
| * Number of buffers to use in FileChannel#write. |
| * |
| * On UNIX, FileChannel#write uses writev-- a kernel interface that allows |
| * us to send multiple buffers at once. This is more efficient than making a |
| * separate write call for each buffer, since it minimizes the number of |
| * transitions from userspace to kernel space. |
| */ |
| private final int WRITEV_SIZE = 20; |
| |
| private final static ByteBuffer newlineBuf = |
| ByteBuffer.wrap(new byte[] { (byte)0xa }); |
| |
| /** |
| * Flushes a bufferedSpans array. |
| */ |
| private void doFlush(byte[][] toFlush, int len) throws IOException { |
| int bidx = 0, widx = 0; |
| ByteBuffer writevBufs[] = new ByteBuffer[2 * WRITEV_SIZE]; |
| |
| while (true) { |
| if (widx == writevBufs.length) { |
| channel.write(writevBufs); |
| widx = 0; |
| } |
| if (bidx == len) { |
| break; |
| } |
| writevBufs[widx] = ByteBuffer.wrap(toFlush[bidx]); |
| writevBufs[widx + 1] = newlineBuf; |
| bidx++; |
| widx+=2; |
| } |
| if (widx > 0) { |
| channel.write(writevBufs, 0, widx); |
| } |
| } |
| |
| @Override |
| public void receiveSpan(Span span) { |
| // Serialize the span data into a byte[]. Note that we're not holding the |
| // lock here, to improve concurrency. |
| byte jsonBuf[] = null; |
| try { |
| jsonBuf = JSON_WRITER.writeValueAsBytes(span); |
| } catch (JsonProcessingException e) { |
| LOG.error("receiveSpan(path=" + path + ", span=" + span + "): " + |
| "Json processing error: " + e.getMessage()); |
| return; |
| } |
| |
| // Grab the bufferLock and put our jsonBuf into the list of buffers to |
| // flush. |
| byte toFlush[][] = null; |
| bufferLock.lock(); |
| try { |
| if (bufferedSpans == null) { |
| LOG.debug("receiveSpan(path=" + path + ", span=" + span + "): " + |
| "LocalFileSpanReceiver for " + path + " is closed."); |
| return; |
| } |
| bufferedSpans[bufferedSpansIndex] = jsonBuf; |
| bufferedSpansIndex++; |
| if (bufferedSpansIndex == bufferedSpans.length) { |
| // If we've hit the limit for the number of buffers to flush, |
| // swap out the existing bufferedSpans array for a new array, and |
| // prepare to flush those spans to disk. |
| toFlush = bufferedSpans; |
| bufferedSpansIndex = 0; |
| bufferedSpans = new byte[bufferedSpans.length][]; |
| } |
| } finally { |
| bufferLock.unlock(); |
| } |
| if (toFlush != null) { |
| // We released the bufferLock above, to avoid blocking concurrent |
| // receiveSpan calls. But now, we must take the channelLock, to make |
| // sure that we have sole access to the output channel. If we did not do |
| // this, we might get interleaved output. |
| // |
| // There is a small chance that another thread doing a flush of more |
| // recent spans could get ahead of us here, and take the lock before we |
| // do. This is ok, since spans don't have to be written out in order. |
| channelLock.lock(); |
| try { |
| doFlush(toFlush, toFlush.length); |
| } catch (IOException ioe) { |
| LOG.error("Error flushing buffers to " + path + ": " + |
| ioe.getMessage()); |
| } finally { |
| channelLock.unlock(); |
| } |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| byte toFlush[][] = null; |
| int numToFlush = 0; |
| bufferLock.lock(); |
| try { |
| if (bufferedSpans == null) { |
| LOG.info("LocalFileSpanReceiver for " + path + " was already closed."); |
| return; |
| } |
| numToFlush = bufferedSpansIndex; |
| bufferedSpansIndex = 0; |
| toFlush = bufferedSpans; |
| bufferedSpans = null; |
| } finally { |
| bufferLock.unlock(); |
| } |
| channelLock.lock(); |
| try { |
| doFlush(toFlush, numToFlush); |
| } catch (IOException ioe) { |
| LOG.error("Error flushing buffers to " + path + ": " + |
| ioe.getMessage()); |
| } finally { |
| try { |
| stream.close(); |
| } catch (IOException e) { |
| LOG.error("Error closing stream for " + path, e); |
| } |
| channelLock.unlock(); |
| } |
| } |
| |
| public static String getUniqueLocalTraceFileName() { |
| String tmp = System.getProperty("java.io.tmpdir", "/tmp"); |
| String nonce = null; |
| BufferedReader reader = null; |
| try { |
| // On Linux we can get a unique local file name by reading the process id |
| // out of /proc/self/stat. (There isn't any portable way to get the |
| // process ID from Java.) |
| reader = new BufferedReader( |
| new InputStreamReader(new FileInputStream("/proc/self/stat"), |
| "UTF-8")); |
| String line = reader.readLine(); |
| if (line == null) { |
| throw new EOFException(); |
| } |
| nonce = line.split(" ")[0]; |
| } catch (IOException e) { |
| } finally { |
| if (reader != null) { |
| try { |
| reader.close(); |
| } catch(IOException e) { |
| LOG.warn("Exception in closing " + reader, e); |
| } |
| } |
| } |
| if (nonce == null) { |
| // If we can't use the process ID, use a random nonce. |
| nonce = UUID.randomUUID().toString(); |
| } |
| return new File(tmp, nonce).getAbsolutePath(); |
| } |
| } |