Actionable Data With MongoDB and Go

Jun 25, 2014

If you are building any kind of application for a consumer based product, it is common to have large amounts of application data being generated about your users. Running reports is a traditional use of this data, but what if you could make this data actionable? What if you could adapt the user experience by aggregating and testing this data against rules that could dictate actions or special messaging?

In this post I am going to show you a way that could be used to make your application data "actionable". We are going to review a sample program written in Go that leverages MongoDB’s aggregation pipeline to execute rules that aggregate and test data. The rule the program defines and executes determines what messaging we want to show the user based on their current set of financial transactions in the system.

The Data
We have two collections of data we will be using which are being hosted in a public MongoLab database. The first collection contains credit card transactions for users. This data represent application generated data:

Collection: demo_user_transactions
    "user_id" : "396bc782-6ac6-4183-a671-6e75ca5989a5",
    "description" : "Chevron",
    "amount" : 23.76,
    "date" : ISODate("2014-04-20T00:00:00.000Z"),
    "category" : "gas"
    "user_id" : "396bc782-6ac6-4183-a671-6e75ca5989a5",
    "description" : "Fandango",
    "amount" : 15.6,
    "date" : ISODate("2014-04-20T00:00:00.000Z"),
    "category" : "movies"

The second collection contains a set of user advice which is presented to the user based on some aspect of their financial transactions. This data represents configuration data:

Collection: demo_user_advice
    "advice_id" : 1,
    "title" : "Dealing with Rising Gas Prices",
    "link" : "",
    "desc" : "With gas prices rising, and no relief in sight, looking for…"
    "advice_id" : 2,
    "title" : "Ways To Stretch A Dollar",
    "link" : "",
    "desc" : "Are you looking for ways to make your dollar go further?…"

We can imagine the transaction data is changing as the system refreshes user transactions throughout the day. The advice data is configurable and can be changed by marketing or business analyst people throughout the day as well. These two collections will allow us to create rules and adapt the user experience by providing the most relevant advice for the target user.

The Rule
To make the data actionable we need to define a rule. For this sample program, a rule defines a test expression that is run through the aggregation pipeline to determine a true or false condition which then results in an action. For this example we are going to create a rule that looks at a user’s purchasing of gasoline. Based on the amount of money the user spends, the action will be to display a piece of advice that is relevant.

Let’s look at the rule we are going to run and the success and failure actions:

  "test" : {
    "collection" : "demo_user_transactions",
      "expressions" : [
          "{ "$match" : { "user_id" : "#userId#", "category" : "gas" }}",
          "{ "$group" : { "_id" : { "category" : "$category" }, "amount" : { "$sum" : "$amount" }}}",
          "{ "$match" : { "amount" : { "$gt" : 20.00}}}"
  "success" : {
      "collection" : "demo_advice",
      "expressions" : [
          "{ "$match" : { "advice_id" : 1 }}"
  "failed" : {
      "collection" : "demo_advice",
      "expressions" : [
          "{ "$match" : { "advice_id" : 2 }}"

The rule is defined as a JSON document that contains three fields each with a sub-document that contains a collection name and the set of expressions to be run through the aggregation pipeline. When the expressions for the test field returns a dataset, then the application will run the expressions defined in the success field. When no dataset is returned, then the application will run the expressions in the failed field.

If we look at the rule a bit closer we can see what we are testing:

{ "$match" : { "user_id" : "#userId#", "category" : "gas" }}
{ "$group" : { "_id" : { "category" : "$category" }, "amount" : { "$sum" : "$amount" }}}
{ "$match" : { "amount" : { "$gt" : 20.00}}}

The first pipeline operation uses a $match command to find all the documents from the demo_user_transactions collection for a specific user where the category is gas. The #userId# tag is replaced with the actual user id by the program before running the expression. Then those gas transactions are grouped and the total amount spent on gasoline is calculated using the $group command. Finally that total spend on gasoline is tested to see if it is greater than $20. If the total spend on gasoline is greater than $20, there will be a dataset returned signifying the rule is true else no dataset is returned and the rule is false.

The Code
Now that we have a brief understanding of the data and the rule, we can examine some of the code that executes this rule against our test MongoDB database. The code is in Github and you can use the following Go command if you have Go already installed:

go get

If you don’t have Go installed check out these links to get started:

To make the application dynamic and keep things simple, the rule is being kept in an individual JSON file within a project folder called rules. This allows us to change the rule without needing to change the program. Let’s examine the function that reads and decode the JSON document from within a given file into a Go struct value so the rule can be processed by the program. We have already seen the JSON document for the rule above, so here are the structs that the JSON document will be decoded into:

// operation contains a set of expressions for a collection.
type operation struct {
    Collection  string   json:"collection"
    Expressions []string json:"expressions"

// rule contains a single rule with an action.
type rule struct {
    Test    operation json:"rule"
    Success operation json:"success"
    Failed  operation json:"failed"

Support for decoding JSON documents exists within the Go standard library. Here is the function retrieveRule that decodes the JSON document found in rules/advice.json into a value of type rule:

 92 // retrieveRule reads and unmarshals the specified rule data file.
 93 func retrieveRule(ruleName string) (*rule, error) {
 94     // Open the file.
 95     file, err := os.Open("rules/" + ruleName + ".json")
 96     if err != nil {
 97         return nil, err
 98     }
100     // Schedule the file to be closed once the function returns.
101     defer file.Close()
103     // Decode the file into a value of the rule type.
104     var r rule
105     err = json.NewDecoder(file).Decode(&r)
107     // We don’t need to check for errors, the caller can do this.
108     return &r, err
109 }

The function is passed a rule name which is used to form the path to the rule file. On line 95 the file is opened and then on line 104 a variable named r of type rule is declared. In combination with the json package’s NewDecoder function followed by the Decode method on line 105, the value of the r variable is updated to reflect the JSON document’s data. Then the address of the variable is passed out of the function for use on line 108.

Next let’s look at the rule processing function named processRule:

111 // processRule processes the rule and displays the results.
112 func processRule(session *mgo.Session, r *rule, user string) error {
113     // Process the rule and check for results
114     log.Println("Test:")
115     results, err := executeOperation(session, r.Test, user)
116     if err != nil {
117         log.Println("Unable To Process Action", err)
118         return err
119     }
121     if len(results) == 0 {
122         // If no result is returned, provide the failed result
123         log.Println("Failed:")
124         _, err = executeOperation(session, r.Failed, user)
125     } else {
126         // Provide the success result
127         log.Println("Succeeded:")
128         _, err = executeOperation(session, r.Success, user)
129     }
131     if err != nil {
132         log.Println("Unable To Process Action", err)
133     }
135     return err
136 }

This function on line 115 executes the expressions for the Test field using the aggregation pipeline and then on line 116 checks for errors. If there are no errors, the determination if a result was returned is tested on line 121. If there is no result, the function executes the set of expressions for the Failed field on line 124, else the Success field expressions are executed on line 128. A test for an error value occurs one more time on line 131 for logging purposes and then the function returns.

Let’s look at one more function named executeOperation to see how the expressions are actually sent to the aggregation pipeline for processing:

138 // executeOperation builds an aggregation pipeline query based on the
139 // configured expressions for the operation.
140 func executeOperation(session *mgo.Session, op operation, user string) ([]bson.M, error) {
141     var err error
142     expressions := make([]bson.M, len(op.Expressions))
144     // Iterate through the set of expressions and build the slice
145     // of operations.
146     for index, exp := range op.Expressions {
147         if index := strings.Index(exp, "#userId#"); index >= 0 {
148             exp = strings.Replace(exp, "#userId#", user, -1)
149         }
151         log.Println(exp)
152         expressions[index] = decodeExpression(exp)
153     }
155     // Capture a collection so we can execute the expressions.
156     collection := session.DB(TestDatabase).C(op.Collection)
157     if collection == nil {
158         return nil, fmt.Errorf("Collection %s does not exist", op.Collection)
159     }
161     // Execute the expressions against the aggregation pipeline.
162     var results []bson.M
163     err = collection.Pipe(expressions).All(&results)
165     // Pretty print the result.
166     output, _ := json.MarshalIndent(results, "", " ")
167     log.Println(string(output))
169     return results, err
170 }

On line 142 we declare a slice of type bson.M named expressions to hold all the expressions we need to execute and then between lines 146 through 153 we iterate through that set of expressions. The first thing that is done in each iteration on line 147 is to check the expression for a user id tag. If the tag is found, it is replaced with the actual user id. On line 151 the expression is displayed and finally on line 152 the expression is decoded into a map of type bson.M and assigned to the expressions slice.

Once the slice of expressions is ready for execution, we capture a collection value from the session value on line 156 and execute the expressions against that collection on line 163. The results from the aggregation pipeline are then returned from the function for processing, which take the form of another map of type bson.M. One line 166 we create a pretty print view of the results and then display those results on line 167.

Now that we have seen the code, let’s look at the results when we run the program for the data and the rule we reviewed.

The Result
When we run the program we see the following output:

{ "$match" : { "user_id" : "396bc782-6ac6-4183-a671-6e75ca5989a5", "category" : "gas" }}
{ "$group" : { "_id" : { "category" : "$category" }, "amount" : { "$sum" : "$amount" }}}
{ "$match" : { "amount" : { "$gt" : 20.00}}}
    "_id": {
      "category": "gas"
    "amount": 23.76
{ "$match" : { "advice_id" : 1 }}
    "_id": "53a70e71792ac1bbba4b016e",
    "advice_id": 1,
    "desc": "With gas prices rising, and no relief in sight, looking for…",
    "link": "",
    "title": "Dealing with Rising Gas Prices"

The first section of the output shows the Test expressions that are executed through the aggregation pipeline with the result immediately following. The results show the total spend for the user was $23.76. Since this value is greater than $20.00, the code then proceeds to execute the Success expressions. This one expression returns the advice document for the advice about managing transportation costs.

If we change the rule so it fails we should get the other piece of advice:

{ "$match" : { "user_id" : "396bc782-6ac6-4183-a671-6e75ca5989a5", "category" : "gas" }}
{ "$group" : { "_id" : { "category" : "$category" }, "amount" : { "$sum" : "$amount" }}}
{ "$match" : { "amount" : { "$lt" : 20.00}}}
{ "$match" : { "advice_id" : 2 }}
    "_id": "53a70e8a792ac1bbba4b016f",
    "advice_id": 2,
    "desc": "Are you looking for ways to make your dollar go further? …",
    "link": "",
    "title": "Ways To Stretch A Dollar"

This time we changed the rule to test for the total spend on gasoline to be less than $20.00. This change resulted in a null dataset so the code proceeded to execute the Failed expressions. This time the advice is more generalized around saving money.

MongoDB with the aggregation pipeline allows you to design programs that can make your data actionable. This small Go program provides one way you can declare and process rules against data in your MongoDB collections and adapt your users experience. Using Go to drive your rules engine gives you the added benefit of high performance and concurrent processing. Together, MongoDB and Go can help you build these robust systems faster, smarter and with a ton of flexibility.