| // Copyright 2015 RedHat, Inc. |
| // Copyright 2015 CoreOS, Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package sdjournal |
| |
| import ( |
| "errors" |
| "fmt" |
| "io" |
| "log" |
| "strings" |
| "sync" |
| "time" |
| ) |
| |
| var ( |
| // ErrExpired gets returned when the Follow function runs into the |
| // specified timeout. |
| ErrExpired = errors.New("Timeout expired") |
| ) |
| |
| // JournalReaderConfig represents options to drive the behavior of a JournalReader. |
| type JournalReaderConfig struct { |
| // The Since, NumFromTail and Cursor options are mutually exclusive and |
| // determine where the reading begins within the journal. The order in which |
| // options are written is exactly the order of precedence. |
| Since time.Duration // start relative to a Duration from now |
| NumFromTail uint64 // start relative to the tail |
| Cursor string // start relative to the cursor |
| |
| // Show only journal entries whose fields match the supplied values. If |
| // the array is empty, entries will not be filtered. |
| Matches []Match |
| |
| // If not empty, the journal instance will point to a journal residing |
| // in this directory. The supplied path may be relative or absolute. |
| Path string |
| |
| // If not nil, Formatter will be used to translate the resulting entries |
| // into strings. If not set, the default format (timestamp and message field) |
| // will be used. If Formatter returns an error, Read will stop and return the error. |
| Formatter func(entry *JournalEntry) (string, error) |
| } |
| |
| // JournalReader is an io.ReadCloser which provides a simple interface for iterating through the |
| // systemd journal. A JournalReader is not safe for concurrent use by multiple goroutines. |
| type JournalReader struct { |
| journal *Journal |
| msgReader *strings.Reader |
| formatter func(entry *JournalEntry) (string, error) |
| } |
| |
| // NewJournalReader creates a new JournalReader with configuration options that are similar to the |
| // systemd journalctl tool's iteration and filtering features. |
| func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) { |
| // use simpleMessageFormatter as default formatter. |
| if config.Formatter == nil { |
| config.Formatter = simpleMessageFormatter |
| } |
| |
| r := &JournalReader{ |
| formatter: config.Formatter, |
| } |
| |
| // Open the journal |
| var err error |
| if config.Path != "" { |
| r.journal, err = NewJournalFromDir(config.Path) |
| } else { |
| r.journal, err = NewJournal() |
| } |
| if err != nil { |
| return nil, err |
| } |
| |
| // Add any supplied matches |
| for _, m := range config.Matches { |
| if err = r.journal.AddMatch(m.String()); err != nil { |
| return nil, err |
| } |
| } |
| |
| // Set the start position based on options |
| if config.Since != 0 { |
| // Start based on a relative time |
| start := time.Now().Add(config.Since) |
| if err := r.journal.SeekRealtimeUsec(uint64(start.UnixNano() / 1000)); err != nil { |
| return nil, err |
| } |
| } else if config.NumFromTail != 0 { |
| // Start based on a number of lines before the tail |
| if err := r.journal.SeekTail(); err != nil { |
| return nil, err |
| } |
| |
| // Move the read pointer into position near the tail. Go one further than |
| // the option so that the initial cursor advancement positions us at the |
| // correct starting point. |
| skip, err := r.journal.PreviousSkip(config.NumFromTail + 1) |
| if err != nil { |
| return nil, err |
| } |
| // If we skipped fewer lines than expected, we have reached journal start. |
| // Thus, we seek to head so that next invocation can read the first line. |
| if skip != config.NumFromTail+1 { |
| if err := r.journal.SeekHead(); err != nil { |
| return nil, err |
| } |
| } |
| } else if config.Cursor != "" { |
| // Start based on a custom cursor |
| if err := r.journal.SeekCursor(config.Cursor); err != nil { |
| return nil, err |
| } |
| } |
| |
| return r, nil |
| } |
| |
| // Read reads entries from the journal. Read follows the Reader interface so |
| // it must be able to read a specific amount of bytes. Journald on the other |
| // hand only allows us to read full entries of arbitrary size (without byte |
| // granularity). JournalReader is therefore internally buffering entries that |
| // don't fit in the read buffer. Callers should keep calling until 0 and/or an |
| // error is returned. |
| func (r *JournalReader) Read(b []byte) (int, error) { |
| if r.msgReader == nil { |
| // Advance the journal cursor. It has to be called at least one time |
| // before reading |
| c, err := r.journal.Next() |
| |
| // An unexpected error |
| if err != nil { |
| return 0, err |
| } |
| |
| // EOF detection |
| if c == 0 { |
| return 0, io.EOF |
| } |
| |
| entry, err := r.journal.GetEntry() |
| if err != nil { |
| return 0, err |
| } |
| |
| // Build a message |
| msg, err := r.formatter(entry) |
| if err != nil { |
| return 0, err |
| } |
| r.msgReader = strings.NewReader(msg) |
| } |
| |
| // Copy and return the message |
| sz, err := r.msgReader.Read(b) |
| if err == io.EOF { |
| // The current entry has been fully read. Don't propagate this |
| // EOF, so the next entry can be read at the next Read() |
| // iteration. |
| r.msgReader = nil |
| return sz, nil |
| } |
| if err != nil { |
| return sz, err |
| } |
| if r.msgReader.Len() == 0 { |
| r.msgReader = nil |
| } |
| |
| return sz, nil |
| } |
| |
| // Close closes the JournalReader's handle to the journal. |
| func (r *JournalReader) Close() error { |
| return r.journal.Close() |
| } |
| |
| // Rewind attempts to rewind the JournalReader to the first entry. |
| func (r *JournalReader) Rewind() error { |
| r.msgReader = nil |
| return r.journal.SeekHead() |
| } |
| |
| // Follow synchronously follows the JournalReader, writing each new journal entry to writer. The |
| // follow will continue until a single time.Time is received on the until channel. |
| func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) error { |
| |
| // Process journal entries and events. Entries are flushed until the tail or |
| // timeout is reached, and then we wait for new events or the timeout. |
| var msg = make([]byte, 64*1<<(10)) |
| var waitCh = make(chan int, 1) |
| var waitGroup sync.WaitGroup |
| defer waitGroup.Wait() |
| |
| process: |
| for { |
| c, err := r.Read(msg) |
| if err != nil && err != io.EOF { |
| return err |
| } |
| |
| select { |
| case <-until: |
| return ErrExpired |
| default: |
| } |
| if c > 0 { |
| if _, err = writer.Write(msg[:c]); err != nil { |
| return err |
| } |
| continue process |
| } |
| |
| // We're at the tail, so wait for new events or time out. |
| // Holds journal events to process. Tightly bounded for now unless there's a |
| // reason to unblock the journal watch routine more quickly. |
| for { |
| waitGroup.Add(1) |
| go func() { |
| status := r.journal.Wait(100 * time.Millisecond) |
| waitCh <- status |
| waitGroup.Done() |
| }() |
| |
| select { |
| case <-until: |
| return ErrExpired |
| case e := <-waitCh: |
| switch e { |
| case SD_JOURNAL_NOP: |
| // the journal did not change since the last invocation |
| case SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE: |
| continue process |
| default: |
| if e < 0 { |
| return fmt.Errorf("received error event: %d", e) |
| } |
| |
| log.Printf("received unknown event: %d\n", e) |
| } |
| } |
| } |
| } |
| } |
| |
| // simpleMessageFormatter is the default formatter. |
| // It returns a string representing the current journal entry in a simple format which |
| // includes the entry timestamp and MESSAGE field. |
| func simpleMessageFormatter(entry *JournalEntry) (string, error) { |
| msg, ok := entry.Fields["MESSAGE"] |
| if !ok { |
| return "", fmt.Errorf("no MESSAGE field present in journal entry") |
| } |
| |
| usec := entry.RealtimeTimestamp |
| timestamp := time.Unix(0, int64(usec)*int64(time.Microsecond)) |
| |
| return fmt.Sprintf("%s %s\n", timestamp, msg), nil |
| } |