Reducing a Mapped Univa Grid Engine Accounting File with Glow (2016/03/06)

This article describes how the Univa Grid Engine accounting file can be processed with Glow.

The Univa Grid Engine cluster scheduler spits out a text file containing information about the resource usage of past jobs running in your cluster: The accounting file.

The accounting file is a very easy to process ASCII text file which consists of one accounting record per line which contains detailed job information about resource usage of a job. Typically it is used with the qacct utility to get useful usage aggregations or a pretty printed output for a job’s resource usage. After one job is finished a line (or when configured, one line per host for parallel jobs) is added to the file by the qmaster process.

Glow is a very interesting MapReduce framework written in Go which is very easy to use. It has two modes, a simple mode where the tasks are executed in goroutines and a more scalable approach where one application can be executed on multiple machines.

In order to process the accounting file with Glow you need to make your MapReduce application aware about the location of the accounting file. Once you source the Grid Engine’s settings.sh file the path can be derived from the default environment variables pointing to the Grid Engine installation directory.

accountingFileName := fmt.Sprintf(„%s/%s/common/accounting“, os.Getenv("SGE_ROOT"), os.Getenv("SGE_CELL"))

In order to read text files with Glow you just need to point the file location and give the amount of shards to use.

flow.New().TextFile(
    accountingFileName, 4,
)

This returns a Glow Dataset structure.

Since the accounting file has some comments at the beginning we need to filter them out. All of them have a common prefix: #. Filtering lines with Glow is simple. It expects a function which determines for a given line if it should be filtered or not. This function can be anonymous:

flow.New().TextFile(
    accountingFileName, 4,
).Filter(func(line string) bool {
    // filter out all comments
    return !strings.HasPrefix(line, "#")
})

The filter returns also a Glow Dataset which again has the same set of methods defined.

Now we can apply our first Mapper to the resulting Dataset. The Mapper should convert the accounting line to an accounting entry data structure. A while ago I’ve written a parser which you can find here The ParseLine() function is less then 30 lines of Go code thanks to Go’s reflection API. If reflection should be used or not for those things is certainly a different debate. It certainly has its power but makes applications harder to debug. Glow makes heavily use of it (but again this makes debugging harder).

In order to compile your code you need Glow and the UGE parsing API (please create issues on github when you find some).

go get github.com/chrislusf/glow
go get github.com/chrislusf/glow/flow
go get github.com/dgruber/ugego

After importing glow and ugego in your Go source the ParseLine() function can be used in a new Mapper which is called on the filtered Dataset. This mapper takes a string and a channel which has the type of the parsed accounting Entry data structure from the ugego package. In this channel the parsed Entries are written out.

Map(func(line string, ch chan accounting.Entry) {
        if e, err := accounting.ParseLine([]byte(line)); err == nil {
                ch <- e
        }
    }

Here additional filtering can be injected in a very simple way. When you are only interested in jobs which run in a specific time window you can check for it with e.StartTime and e.EndTime (both are both time.Time values) and only write the good ones back in the channel.

Finally the elements in the channel have to be reduced. That is the hard part. Reducing accounting entries would mean aggregating usage values but that does not make sense for elements like the return value. For simplicity I’m just accounting CPU time here, which is used CPU seconds.

Reduce(func(x accounting.Entry, y accounting.Entry) accounting.Entry {
    y.CPU += x.CPU
    return y
})

Reducing other elements or build counters would require to build better suited data structures. Go’s embedding can be used for that. In any case overflows needs to be avoided.

Finally we print our results on the reduced accounting entry:

Map(func(out accounting.Entry) {
    fmt.Println("Combined CPU times for all jobs.")
    fmt.Printf("%f\n", out.CPU)
}).Run()

The full program can be found here.

PS: Using grouped aggregations is not much harder. For example if you want to list the summed amount of consumed slots by job name you can create a flow.KeyValue and use ReduceByKey():


.Map(func(e accounting.Entry) flow.KeyValue {
		return flow.KeyValue{e.JobName, e.Slots}
}).ReduceByKey(func(s1, s2 int) int {
		return s1 + s2
}).Map(func(jobname string, slots int) {
		fmt.Printf("%s %d\n", jobname, slots)
}).Run()

PPS: Note that this can then be further simplified by


.Map(func(e accounting.Entry) (string, int) {
		return e.JobName, e.Slots
}).ReduceByKey(func(s1, s2 int) int {
		return s1 + s2
}).Map(func(jobname string, slots int) {
		fmt.Printf("%s %d\n", jobname, slots)
}).Run()