Anomaly Detection using Sigma Rules (Part 4): Flux Capacitor Design

We implement a Spark structured streaming stateful mapping function to handle temporal proximity correlations in cyber security logs

Jean-Claude Cote
Towards Data Science

--

Image by Robert Wilson from Pixabay

This is the 4th article of our series. Refer to part 1 , part 2 and part 3 for some context.

In this article, we will detail the design of a custom Spark flatMapWithGroupState function. We chose to write this function in Scala since Spark is itself written in Scala. We named this function Flux Capacitor. Electric capacitors accumulate electric charges and later release them. Similarly, our flatMapWithGroupState will accumulate tags (evaluated true/false Sigma expressions) and later release them.

Our Flux Capacitor function is easy to configure and let’s the user specify how and when each individual tag is stored and retrieved. In our implementation, we encapsulated the tag while updating and retrieving behavior in a tag evaluator class hierarchy.

Tag Evaluators

Unless otherwise specified, all tags will be evaluated by the transitory evaluator. This evaluator is a no-op, it simply passes the current tag value through. The cacheable evaluator is a base class capable of storing and retrieving tags in the bloom filter. This base class has two template methods, makeBloomPutKey and makeBloomGetKey, letting sub-classes dictate how tags are stored and retrieved. The default tag uses the same key to get and put values in the bloom. The key is simply the concatenation of the rule and tag name. When a user passes the following specification to the Flux Capacitor, default tag evaluator objects are create to handle storing and retrieval of the tags.

rules:
- rulename: rule5
description: un-ordered set of events by host
action: temporal
ordered: false
tags:
- name: recon_cmd_a
- name: recon_cmd_b
- name: recon_cmd_c

We designed our Flux Capacitor specification to resemble Sigma rules. However, this specification only describe how tags are handled in the Flux Capacitor.

The ordered evaluator specializes the default evaluator by introducing a “dependent tag”. The dependent tag has to be evaluated first, and only when the dependent tag is true will the ordered tag be in turn evaluated. This evaluator is used when the user specifies ordered: true.

The other side of the hierarchy handles parent/child relationships. Contrary to the default tag, the parent evaluator’s put key is made of rule name + tag name + parent id and the get key is made of rule name + tag name + current id. The specification for parent/child let the user specify which column name holds the parent ID and which column name holds the current message ID. In this example parent_id and id are the columns representing a parent/child relationship.

rules:
- rulename: rule4
description: propagate to child only
action: parent
parent: parent_id
child: id
tags:
- name: parent_process_feature1

As explained in our previous article, the ancestor evaluator specializes the parent evaluator by storing itself in the bloom thus propagating itself down the parent/child hierarchy.

Rule Evaluators

So far we’ve only applied one Sigma rule at a time. We did this intentionally to keep things simple. However, in reality we want to apply multiple Sigma rules simultaneously. When we process a log source and parse it’s contents, we want to apply all Sigma rules that apply it.

To keep things organized, we represent each rule and it’s associated tags in a column of type MapType. The keys in this map are the rule names and the values are maps of tag name to tag value (true/false).

select

-- regroup each rule's tags in a map (ruleName -> Tags)
map(
'rule1', map('cr1_selection', cr1_selection,
'cr1_filter_empty', cr1_filter_empty,
'cr1_filter', cr1_filter,
'cr1_filter_localserver_fp', cr1_filter_localserver_fp,
'pr1_filter_iexplorer', pr1_filter_iexplorer,
'pr1_filter_msiexec_syswow64', pr1_filter_msiexec_syswow64,
'pr1_filter_msiexec_system32', pr1_filter_msiexec_system32),
'rule2', map('cr2_filter_provider', cr2_filter_provider,
'cr2_filter_git', cr2_filter_git,
'pr2_selection', pr2_selection,
'pr2_filter_git', pr2_filter_git),
'rule3', map('cr3_selection_other', cr3_selection_other,
'cr3_selection_atexec', cr3_selection_atexec,
'pr3_selection_other', pr3_selection_other,
'pr3_selection_atexec', pr3_selection_atexec)
) as sigma

By using maps, we can keep the schema of the input and output rows constant. Any new Sigma rule that is later introduce will simply be added to this map and will not affect the overall input and output row schemas.

Our Flux Capacitor specification reflects this. The specification applies to a list of rule names. Each of those rule specification state how to update the tags for that rule.

rules:
- rulename: rule1
description: ordered list of events by host
action: temporal
ordered: true
tags:
- name: recon_cmd_a
- name: recon_cmd_b
- name: recon_cmd_c

- rulename: rule3
description: propagate to all decendents
action: ancestor
tags:
- name: ancestor_process_feature1
parent: parent_id
child: id

In our implementation, each rule specification is handled by a class Rule. The Rules class valuates each Rule in turn.

The Mapping Function

The Spark Dataframe flatMapGroupsWithState invokes a user-provided function on each group while maintaining a user-defined per-group state between invocations. The signature of the function takes a key, an input row iterator (per group) and a state, and returns an output row iterator. Our Flux Capacitor function is as follows:

case class FluxCapacitorMapFunction(
val tagCapacity: Int,
val specification: String
) {

def processBatch(
key: String,
rows: Iterator[Row],
state: GroupState[FluxState]
): Iterator[Row] = {

val bloom =
if (state.exists()) {
state.get()
} else {
BloomFilter.create(Funnels.stringFunnel(), tagCapacity, 0.01)
}

val rulesConf = RulesConf.load(specification)
val rules = new RulesAdapter(new Rules(rulesConf, bloom))
val outputRows = rows.map(row => rules.evaluateRow(row))
state.update(bloom)
outputRows.iterator
}

The first time the function is called, we will have no state. In that case, we create a Guava bloom filter with a false positive probably of 1%.

We then load the specification from a YAML string. We will see later how the user passes this specification to the function. The specification is parsed and provided to the Rulesclass. The Rulesclass creates the corresponding tag evaluators.

Next, we apply the evaluateRow to every input row of this group of rows resulting in a list of output rows.

Invoking evaluateRow modifies the bloom filter (some of the tags will have been stored in the bloom). Thus, we persist the bloom using state.update(bloom).

Finally, we return an iterator to the output rows.

Applying the Flux Capacitor

From a users perspective, using the Flux Capacitor to perform anomaly detection is quite simple.

Let’s suppose the user has already created a dataframe taggedEventsDF. In later articles, we will show how the sigma compiler can be leveraged to create this dataframe.

val taggedEventsDF = ...

val specification = """
rules:
- rulename: rule1
description: ordered list of events by host
action: temporal
ordered: true
tags:
- name: recon_cmd_a
- name: recon_cmd_b
- name: recon_cmd_c

- rulename: rule3
description: propagate to all decendents
action: ancestor
tags:
- name: ancestor_process_feature1
parent: parent_id
child: id
"""

val tagCapacity = 200000
val flux = new FluxCapacitorMapFunction(tagCapacity, specification)

Once the FluxCapacitorMapFunction is created, it needs to be passed to the Spark flatMapGroupsWithState. The user also needs to specify which of the columns holds the host ID.

// create encoders to serialize/deserialize the output rows and the bloom
val outputEncoder = RowEncoder(taggedEventsDF.schema).resolveAndBind()
val stateEncoder = Encoders.javaSerialization(BloomFilter.class)

// we associate a bloom with each host
var groupKeyIndex = df.schema.fieldIndex("host_id")

taggedEventsDF
.groupByKey(row => row.getString(groupKeyIndex))
.flatMapGroupsWithState(
Append,
GroupStateTimeout.NoTimeout()
)(flux.processBatch)(stateEncoder, outputEncoder)

We associate a bloom per host, so our grouping key groupByKeyis the host_idcolumn.

Spark needs to know how to serialize and deserialize the state. We create Spark encoders for the output rows and for the bloom filter (state).

We specify a mode of Append, rather than Update or Complete. In Append mode, every row we output is added to the result table which is desired in anomaly detections.

Finally, we pass our flux.processBatch to the flatMapGroupsWithState function.

Conclusion

In this article we showed how we implemented a configurable and re-usable stateful mapping function capable of handling multiple use cases: temporal proximity correlation (ordered and un-ordered) and parent/child relationships (including ancestors).

Currently, our state never times out, we are always using the same bloom filter, thus the bloom filter will eventually fill up. In our next article, we will address this issue with a forgetful bloom filer and optimize the performance of the Flux Capacitor.

All images unless otherwise noted are by the author

--

--

Data democratization advocate and principal engineer at Canadian Centre for Cyber Security | jean-claude.cote@cyber.gc.ca | Twitter @cybercentre_ca