(This post is part one of a two-part technology series around building and using an IPMI ingester and kit. Part two coming soon.)
In many data aggregation and analysis tools, the ecosystem is fully closed source, and often even data ingest protocols are proprietary. This means that if you want to ingest a novel data format of your own, you’re either, a) $%*! out of luck, or b) forced to collapse your data into some form of low performance, textual, line-delimited data that a generic log ingester will work with.
At Gravwell HQ, we take a different approach. All of our ingesters are open source and freely available under a BSD license, and our ingest framework is open and available as a Go library. In this post, we’ll be taking a tour of how we wrote a real and officially supported Gravwell ingester: the new Gravwell IPMI Ingester. We’ll cover how we manage configuration files, set up and manage indexer connections, and transform IPMI data into a flexible JSON schema before sending it out.
How the uh... ingester is made.
This post assumes you have a working knowledge of the Go programming language, and a basic understanding of what IPMI is--though we’ll provide a quick overview of IPMI just to make sure.
A Brief Introduction to IPMI
The Intelligent Platform Management Interface or IPMI, is a huge pain to work with a simple and elegant interface that allows system administrators to manage and monitor computer systems out-of-band. It’s usually used simply to control power states, read sensors such as CPU temperature and fan speed, and collect failure event logs from servers.
Data from IPMI comes in a variety of formats, including fully custom OEM formats that may or may not have documentation, though the majority of systems support two major data types: System Event Logs (SEL) and Sensor Data Record (SDR). These are the two that we’ll focus on for the remainder of this post.
SEL messages usually look something like this:
1 | 06/23/2020 | 00:00:32 | Power Supply #0xc9 | Failure detected () | Deasserted
2 | 06/26/2020 | 23:31:38 | Power Supply #0xc9 | Failure detected () | Asserted
3 | 06/26/2020 | 23:31:45 | Power Supply #0xc9 | Failure detected () | Deasserted
4 | 03/14/2021 | 04:24:25 | Power Supply #0xc9 | Failure detected () | Asserted
5 | 03/16/2021 | 23:07:46 | Power Supply #0xc9 | Failure detected () | Deasserted
In this example, we have 5 messages, each containing a timestamp, a device with device ID (a power supply), a message (failure detected), and a “direction”--in this case deasserted/asserted. In our ingester, we’ll split messages into individual entries upon ingest.
SDR readings similarly can be all over the place (including obscure OEM special messages), but are typically formatted into just a few elements:
CPU1 Vcore | 0.86 Volts | ok
CPU2 Vcore | 0.78 Volts | ok
VDIMM AB | 1.49 Volts | ok
VDIMM CD | 1.49 Volts | ok
VDIMM EF | 1.49 Volts | ok
VDIMM GH | 1.49 Volts | ok
3.3V | 3.12 Volts | ok
+3.3VSB | 3.26 Volts | ok
5V | 4.93 Volts | ok
+5VSB | 5.06 Volts | ok
12V | 11.98 Volts | ok
VBAT | 3.12 Volts | ok
Chassis Intru | 0x01 | ok
PS1 Status | 0x01 | ok
PS2 Status | 0x01 | ok
Here we have a sensor, a reading, and a final message that tells us if the reading is within some specified threshold. Thresholds can be set in IPMI as well via yet another interface. Notice the “reading” for things like “PS1 Status”; you may wonder what “0x01” means for a power supply status, and you’d have a very good question on your hands.
Sidenote: Although it’s possible to have Gravwell control IPMI power states, etc., it’s not exactly the duty of an ingester to do that, so we omit that level of control entirely.
Goals of our IPMI Ingester
As we discussed above, IPMI provides myriad data points, including serial numbers, inventory related information, and so on. It additionally provides mechanisms to control infrastructure remotely. For our ingester, all we care about is information that potentially changes over time: System Event Logs (SEL) and Sensor Data Records (SDR).
That reduces the complexity of our ingester quite a bit--we can just actively poll each IPMI target for all of its SEL and SDR data, translate it to our schema (more on that later), and throw it at Gravwell.
Our IPMI Ingester will need two things: a connection to one or more indexers (the Gravwell ingest library manages all of that for us), and access to the IPMI interfaces on all of the devices we intend to connect to. IPMI is usually deployed on an out-of-band management network, so the ingester will likely need to be on a dual-homed server to get access to everything.
Finally, we’ll treat SDR and SEL data slightly differently. SDR polling will be grouped into a single big entry of “all the sensor values at the time of polling”, while SEL records will be split into individual entries. SEL records already provide their own timestamps (well, actually only some do, because OEM SEL types can do literally anything they want), so we’ll make use of that when possible.
Data Format
We’ll be using JSON as the encoding for our entries, and we’ll be wrapping our SEL and SDR messages in a very simple container format. In Go, our SEL and SDR types look like this:
type tSEL struct {
Target string
Type string
Data ipmigo.SELRecord
}
type tSDR struct {
Target string
Type string
Data map[string]*tSDRData
}
type tSDRData struct {
Type string
Reading string
Units string
Status string
}
Both types contain some "meta" fields: “Target” (the IP:Port of the IPMI device the record came from), and “Type” (either “SEL” or “SDR” in this implementation). We may add more formats at another time, but we keep it simple here and just use a string for the Type.
For SEL messages, we package the actual message using the underlying types of the 3rd party IPMI library we’ll be using. There are several SEL message types defined, and OEM providers can create their own, so we just let our 3rd party library take care of things for us.
The SDR format is simpler. We send up a map of sensor values (type, reading, units, status), organized by name. This also has the advantage of making extractions with Gravwell’s JSON module easy.
Here are a few examples of what an encoded JSON message may look like:
{
"Type": "SDR",
"Target": "10.10.10.10:623",
"Data": {
"+3.3VSB": {
"Type": "Voltage",
"Reading": "3.26",
"Units": "Volts",
"Status": "ok"
},
"+5VSB": {...},
"12V": {...}
}
}
{
"Target": "10.10.10.10:623",
"Type": "SEL",
"Data": {
"RecordID": 25,
"RecordType": 2,
"Timestamp": {
"Value": 1506550240
},
"GeneratorID": 32,
"EvMRev": 4,
"SensorType": 5,
"SensorNumber": 81,
"EventType": 111,
"EventDir": 0,
"EventData1": 240,
"EventData2": 255,
"EventData3": 255
}
}
Looks like we have everything we need to get started -- let’s dive in!
Building the Ingester
Note: The full implementation of this ingester is available on Gravwell’s Github page for you to follow along.
This will be a pretty simple ingester, mostly because we’re leaving the heavy lifting of actually interfacing with IPMI to a 3rd party package: Keita Sone’s excellent ipmigo package. Essentially, all we’re doing in this ingester is providing some glue logic to put ipmigo and Gravwell’s ingest libraries together.
Since this will be an official Gravwell Ingester, we will have some verbose error checking and configuration rigmarole, but the whole program will still be just a few hundred lines of code by the end.
We’ll organize the project in two files:
config.go -- Config file processing
main.go -- everything else
Easy enough.
Configuration
Gravwell uses a simple configuration library to process configuration files. All we have to do is provide a struct with member names that translate to configuration file parameters and we’re done. A complete configuration file will look something like this:
[Global]
Ingest-Secret = IngestSecrets
Connection-Timeout = 0
Insecure-Skip-TLS-Verify=false
#Cleartext-Backend-Target=127.0.0.1:4023 #example of adding a cleartext connection
#Cleartext-Backend-Target=127.1.0.1:4023 #example of adding another cleartext connection
#Encrypted-Backend-Target=127.1.1.1:4024 #example of adding an encrypted connection
Pipe-Backend-Target=/opt/gravwell/comms/pipe #a named pipe connection, this should be used when ingester is on the same machine as a backend
#Ingest-Cache-Path=/opt/gravwell/cache/simple_relay.cache #adding an ingest cache for local storage when uplinks fail
#Max-Ingest-Cache=1024 #Number of MB to store, localcache will only store 1GB before stopping. This is a safety net
Log-Level=INFO
Log-File=/opt/gravwell/log/ipmi.log
[IPMI "Server 1"]
Target="127.0.0.1:623"
Username="user"
Password="pass"
Tag-Name=ipmi
#Source-Override="DEAD::BEEF" #override the source for just this Queue
All of the items in the “Global” section are already provided for us by the Gravwell configuration library. We just need to let the library know that we want to add a custom “stanza” for IPMI configurations. We do that by defining a few structs in config.go:
type ipmi struct {
Tag_Name string
Target []string
Username string
Password string
Preprocessor []string
Source_Override string
Ignore_Timestamps bool //Just apply the current timestamp to lines as we get them
}
type cfgType struct {
Global config.IngestConfig
IPMI map[string]*ipmi
Preprocessor processors.ProcessorConfig
}
We’ve defined a struct “ipmi” that has all of the fields of the IPMI stanza from our config file -- the configuration library actually uses the field names of our struct to define the config file parameters. Variables with underscores, such as “Tag_Name” become hyphenated in the config file, and will become “Tag-Name”. Our top level struct, "cfgType", has a Global
member which makes up all of the global options that are predefined for a Gravwell Ingester, and we’ve added a map[string]*ipmi
member, which becomes the “[IPMI foo]” that we see in Gravwell config files. Finally, we have a Preprocessor
member that again is a pre-defined Gravwell configuration option that will allow us to define “[Preprocessor foo]” stanzas.
Next, we’ll create a “GetConfig” function that our project will use to get a populated cfgType
struct:
func GetConfig(path string) (*cfgType, error) {
var c cfgType
if err := config.LoadConfigFile(&c, path); err != nil {
return nil, err
}
if err := verifyConfig(&c); err != nil {
return nil, err
}
// Verify and set UUID
if _, ok := c.Global.IngesterUUID(); !ok {
id := uuid.New()
if err := c.Global.SetIngesterUUID(id, path); err != nil {
return nil, err
}
if id2, ok := c.Global.IngesterUUID(); !ok || id != id2 {
return nil, errors.New("Failed to set a new ingester UUID")
}
}
return &c, nil
}
This one is also pretty simple -- We call the config library’s LoadConfigFile()
function with a reference to an empty struct and the path to the config file (the path is set in main.go which we’ll see later). We then call a verify function, which we’ll see next, and double check that the config file has a UUID declared. Gravwell Ingesters must have a UUID to communicate to indexers; it’s expected that ingesters leave the UUID unset in the default configs and simply let the config library generate one for you.
The verifyConfig
function walks key elements of the returned populated struct to make sure it’s sane:
func verifyConfig(c *cfgType) error {
//verify the global parameters
if err := c.Global.Verify(); err != nil {
return err
}
if len(c.IPMI) == 0 {
return errors.New("No IPMI targets specified")
}
if err := c.Preprocessor.Validate(); err != nil {
return err
}
for k, v := range c.IPMI {
if len(v.Tag_Name) == 0 {
v.Tag_Name = `default`
}
if strings.ContainsAny(v.Tag_Name, ingest.FORBIDDEN_TAG_SET) {
return errors.New("Invalid characters in the Tag-Name for " + k)
}
if err := c.Preprocessor.CheckProcessors(v.Preprocessor); err != nil {
return fmt.Errorf("Listener %s preprocessor invalid: %v", k, err)
}
}
return nil
}
All of the checks here are straightforward -- make sure we have at least one IPMI stanza, make sure the tag name is set (or give it a default) and does not contain one of the forbidden characters, and that our preprocessors are correctly defined (the config library provides helper functions for those).
Ingest Muxer
With our config file defined, we’ll move on to our main function. But first, another struct:
type handlerConfig struct {
target string
username string
password string
tag entry.EntryTag
src net.IP
wg *sync.WaitGroup
proc *processors.ProcessorSet
ctx context.Context
client *ipmigo.Client
SELIDs map[uint16]bool
ignoreTimestamps bool
Each “IPMI” stanza will have its own goroutine to independently connect, manage, and ingest data. We’ll organize individual connections in a map[string]*handlerConfig
, each of which carries the configuration options from its stanza, as well as a handle to an IPMI Client from ipmigo and a few housekeeping items like a context and waitgroup.
Back to our main function. The whole program is small enough that we can keep most of it in main()
proper, and we’ll organize it into 3 sections: reading the configuration file, setting up the ingester connections, and starting our IPMI clients.
cfg, err := GetConfig(*confLoc)
if err != nil {
lg.FatalCode(0, "Failed to get configuration: %v\n", err)
return
}
if len(cfg.Global.Log_File) > 0 {
fout, err := os.OpenFile(cfg.Global.Log_File, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0640)
if err != nil {
lg.FatalCode(0, "Failed to open log file %s: %v", cfg.Global.Log_File, err)
}
if err = lg.AddWriter(fout); err != nil {
lg.Fatal("Failed to add a writer: %v", err)
}
if len(cfg.Global.Log_Level) > 0 {
if err = lg.SetLevelString(cfg.Global.Log_Level); err != nil {
lg.FatalCode(0, "Invalid Log Level \"%s\": %v", cfg.Global.Log_Level, err)
}
}
}
Configuration is easy since we’ve already done most of the work in config.go.
We just call our GetConfig()
function from earlier, and set up any logging specified in the config file. Gravwell’s ingest library also provides a logger, so we’ll use that.
The next section is to create indexer connections. Again, the Gravwell Ingest library does most of the work, we just have to populate a UniformMuxerConfig
(read “gravwell connection”) and away we go.
id, ok := cfg.Global.IngesterUUID()
if !ok {
lg.FatalCode(0, "Couldn't read ingester UUID\n")
}
igCfg := ingest.UniformMuxerConfig{
IngestStreamConfig: cfg.Global.IngestStreamConfig,
Destinations: conns,
Tags: tags,
Auth: cfg.Global.Secret(),
LogLevel: cfg.Global.LogLevel(),
VerifyCert: !cfg.Global.InsecureSkipTLSVerification(),
IngesterName: ingesterName,
IngesterVersion: version.GetVersion(),
IngesterUUID: id.String(),
IngesterLabel: cfg.Global.Label,
RateLimitBps: lmt,
Logger: lg,
CacheDepth: cfg.Global.Cache_Depth,
CachePath: cfg.Global.Ingest_Cache_Path,
CacheSize: cfg.Global.Max_Ingest_Cache,
CacheMode: cfg.Global.Cache_Mode,
LogSourceOverride: net.ParseIP(cfg.Global.Log_Source_Override),
}
igst, err = ingest.NewUniformMuxer(igCfg)
if err != nil {
lg.Fatal("Failed build our ingest system: %v\n", err)
return
}
defer igst.Close()
if err := igst.Start(); err != nil {
lg.Fatal("Failed start our ingest system: %v\n", err)
return
}
if err := igst.WaitForHot(cfg.Global.Timeout()); err != nil {
lg.FatalCode(0, "Timedout waiting for backend connections: %v\n", err)
return
}
The ingest configuration takes all of the connection targets specified in the config file and handles load balancing, reconnecting, caching, and everything else specified in the “Global” section for us. After creating the config, we call Start()
, which will return immediately, and the ingest library will attempt to connect to all of the specified targets. The WaitForHot()
call simply blocks until we have at least one indexer connection ready.
Next, we populate a handlerConfig
object for each of our “IPMI” stanzas, and call run()
for each one:
var wg sync.WaitGroup
ipmiConns = make(map[string]*handlerConfig)
ctx, cancel := context.WithCancel(context.Background())
for k, v := range cfg.IPMI {
var src net.IP
if v.Source_Override != `` {
src = net.ParseIP(v.Source_Override)
if src == nil {
lg.FatalCode(0, "Listener %v invalid source override, \"%s\" is not an IP address", k, v.Source_Override)
}
} else if cfg.Global.Source_Override != `` {
// global override
src = net.ParseIP(cfg.Global.Source_Override)
if src == nil {
lg.FatalCode(0, "Global Source-Override is invalid")
}
}
//get the tag for this listener
tag, err := igst.GetTag(v.Tag_Name)
if err != nil {
lg.Fatal("Failed to resolve tag \"%s\" for %s: %v\n", v.Tag_Name, k, err)
}
hcfg := &handlerConfig{
target: v.Target,
username: v.Username,
password: v.Password,
tag: tag,
src: src,
wg: &wg,
ctx: ctx,
SELIDs: make(map[uint16]bool),
ignoreTimestamps: v.Ignore_Timestamps,
}
if hcfg.proc, err = cfg.Preprocessor.ProcessorSet(igst, v.Preprocessor); err != nil {
lg.Fatal("Preprocessor failure: %v", err)
}
ipmiConns[k] = hcfg
}
for _, v := range ipmiConns {
go v.run()
}
Finally, we just wait for a signal to exit and call some obligatory cleanup code, leaving IPMI handlers to spin and ingest:
utils.WaitForQuit()
cancel()
if err := igst.Sync(time.Second); err != nil {
lg.Error("Failed to sync: %v\n", err)
}
if err := igst.Close(); err != nil {
lg.Error("Failed to close: %v\n", err)
}
IPMI Connections
Each IPMI handler establishes a connection to an IPMI device and then spins forever, reading and ingesting:
func (h *handlerConfig) run() {
var err error
for {
h.client, err = ipmigo.NewClient(ipmigo.Arguments{
Version: ipmigo.V2_0,
Address: h.target,
Username: h.username,
Password: h.password,
CipherSuiteID: 3,
})
if err != nil {
lg.Error("Failed to connect to %v: %w", h.target, err)
time.Sleep(PERIOD)
continue
}
if err := h.client.Open(); err != nil {
lg.Error("Failed to connect to %v: %w", h.target, err)
time.Sleep(PERIOD)
continue
}
defer h.client.Close()
...
The first part of our run()
function opens a new connection via the ipmigo
library, and will retry after a sleep if there are any errors. Next, we’ll descend into another for
loop for reading IPMI records:
// successful connection, spin on getting records
for {
// grab all SDR records and throw them as a single entry
sdr, err := h.getSDR()
if err != nil {
lg.Error("%v", err)
} else {
ent := &entry.Entry{
SRC: h.src,
TS: entry.Now(),
Tag: h.tag,
Data: sdr,
}
if err = h.proc.ProcessContext(ent, h.ctx); err != nil {
lg.Error("Sending message: %v", err)
}
}
In our read loop, we grab all SDR records and throw them at the ingester after packing them into an entry. The Data
field is just a []byte
; we’ll let the indexer figure out what to do with it for us, either through an accelerator or at query time with a module. An entry has four fields: SRC, TS, Tag, and Data, which map directly to the values that are present in every entry when working with a query. Also note the ProcessContext()
call. In a Gravwell Ingester connection, we pass all entries through this function so that any preprocessors that are defined for this handler are invoked before the data is written to the indexer. If there are no preprocessors defined, then this function simply writes the data, unmodified, to the indexer.
We do much the same for SEL records:
// grab all SEL records that we haven't already seen
sel, err := h.getSEL()
if err != nil {
lg.Error("%v", err)
} else {
// throw them as individual entries
for _, v := range sel {
b, err := json.Marshal(v)
if err != nil {
lg.Error("Encoding SEL record: %v", err)
continue
}
var ts entry.Timestamp
if h.ignoreTimestamps {
ts = entry.Now()
} else {
switch s := v.Data.(type) {
case *ipmigo.SELEventRecord:
ts = entry.UnixTime(int64((&s.Timestamp).Value), 0)
case *ipmigo.SELTimestampedOEMRecord:
ts = entry.UnixTime(int64((&s.Timestamp).Value), 0)
default:
// other types just don't have a timestamp
ts = entry.Now()
}
}
ent := &entry.Entry{
SRC: h.src,
TS: ts,
Tag: h.tag,
Data: b,
}
if err = h.proc.ProcessContext(ent, h.ctx); err != nil {
lg.Error("Sending message: %v", err)
}
}
}
The main difference here is that we split SEL records into individual entries, using the timestamp that the record might have as the TS field in the entry. If the timestamp cannot be inferred, or the Ignore-Timestamps
flag is set, we use entry.Now()
instead.
Testing
Let’s compile and fire up our ingester now and see what we get. I’ve run mine for a few days against a few test servers that support IPMI. A simple “tag=ipmi” gets us:
Not bad, and Gravwell correctly identifies the data as JSON and gives us a nifty JSON-expansion view.
Let’s see if we can tease out some plots as well. We have 4 servers in this dataset, so let’s plot CPU 1 temperature over the last week. All of our sensor values are in a JSON object, so we can extract them by quoting the name in a dotted extraction:
tag=ipmi json Target Data."CPU1 Temp".Reading
We’ll do a little math to get a smooth mean from the dataset, and then chart by Target:
tag=ipmi json Target Data."CPU1 Temp".Reading
| stats mean(Reading) by Target
| chart mean by Target
Not bad! Though now I need to go check out what happened to the node in red...
Conclusion
And that’s it! Some code was elided in this post, in particular the getSEL and getSDR functions, which can be found in the implementation on Gravwell’s Github page. Gravwell ingesters are very straightforward to create, in fact most of the code presented here was a combination of boilerplate and configuration specification. For one-off or custom applications, an ingester implementation can be even smaller. Try it out for yourself! Click the button below to get started with our free Community Edition.