Transactions(Atomicity)

Applications sometimes have the need to group a number of activities into a transaction, where if any of the activities fail none of them will occur. Following is a description how this can be done.

Atomic and NonAtomic Activities

A transaction can consist of two types of activities, atomic and nonatomic.

Atomic activies are considered more critical. They are validated before being executed. An example would be debiting money from an account. At any given time, we would want only one debit occuring on a given account so that we can be sure that once the validation has occured, the debit can succeed.

NonAtomic activites are meant to occur after all of the atomic activites are executed. Examples would be writing to a database or forwarding data to an external system. These items will eventually occur.

Event Reliability is used to make sure this happens.

It should be noted that the NonAtomic activties are meant as a post processing stage for whatever atomic activities have occurred. It should not be used to promote the next step of the business flow.

Business Validations

Sometimes you would want to fail the execution of the activities due to some business oriented rules, such as:

  • All activities sent from a specific location shoult not be performed.
  • All activities which are not a multiplication of 50 should not be performed.
  • All debits on a dormant account should be failed.
  • All debits that are twice the balance in the account should be passed only if the account has enough reserved ammount on some safe deposit.

These business oriented rules, called business validations, are being performed before the activities (Atomic and NonAtomic) start executing, and assure:

  • If business validations fail, then the activities will not perform.
  • If business validations pass, then the activities will or will not perform depending on the activity level validations.

This is also your extension point to bring in to the actor all the data required to perform atomic validations, as they must be executed synchronously.

Relevant Models

ActorActivity describes the atomic and non atomic activities.
BaseActorEntity is the base class for objects that we want to be part of a transaction
BaseJournalEntry is the base class for the container of atomic and nonatomic activites, i.e. the actual transaction.

You must extend BaseActorEntity and BaseJournalEntity as described below. ActorActivity can used without extending.

How to Use ActorActivity

ActorActivity describes the atomic and nonatomic activities that are part of the JournalEntry (transaction).

Following are the relevant properties: modelName the modelName of the Actor (extended from BaseActorEntity) that this activity will apply to. entityId the id (instance) of the particular Actor. instructionType user defined type of transaction (defined in the class extended from BaseActorEntity). value the value that will be applied to the Actor.

Example Actor Activity
{
        "entityId": "666",
        "payload": {"value": 1000},
        "modelName": "Inventory",
        "instructionType": "CREDIT"
}
JSON definition of ActorActivity
{
  "name": "ActorActivity",
  "base": "PersistedModel",
  "idInjection": true,
  "properties": {
    "entityId": {
    "type": "string"
    },
    "payload": {
      "type": "object"
    }, 
    "modelName": {
           "type": "string"
    },
    "instructionType": {
      "type":"string"
    }
  },
  "validations": [],
  "relations": {},
  "acls": [],
  "methods": {}
}

ActorActivity can be extended to include whatever properties are needed for the particular business situation. This object will be passed into the callback functions described below.

How to Extend BaseActorEntity

You will have to define the following variables:
atomicTypes - an array of user defined types that will identify the atomic actions.
nonAtomicTypes - an array of user defined types that will identify the nonatomic actions.
associatedModels - an array of journal based models (transactions) that can contain this actor.
Any properties that should available within the functions described below must be declared/inserted at the creation of the Actor instance. The object must be named “stateObj”. An example is given below in the test data.

You will have to define the following functions:
validateConditionfunction(actor, activity) Test apply the action to see if it would suceed
activity - the actorActivity that was written to the JournalEntity. The base object contains:
instructionType - the type that was written in the ActorActivity.
value - the value that will be applied (which was written in the ActorActivity).
actor - the current up to date “actor” instance in memory.
This function should return True or False depending on the validation.
Pay attention - The execution of this function should be synchronous.

atomicInstructionsfunction(actor, activity) Apply the atomicAction.
activity - the actorActivity that was written to the JournalEntity. The base object contains:
instructionType - the type that was written in the ActorActivity.
value - the value that will be applied (which was written in the ActorActivity).
actor - the current up to date “actor” instance in memory.
This function should return the modified actor.
Pay attention - The execution of this function should be synchronous.

nonAtomicInstructionsfunction(actor, activity) Apply the nonatomic action.
activity - the actorActivity that was written to the JournalEntity. The base object contains:
instructionType - the type that was written in the ActorActivity.
value - the value that will be applied (which was written in the ActorActivity).
actor - the current up to date “actor” instance in memory.
This function should return the modified actor.

processPendingMessage(message, actor) process message from this Actor’s queue.
message - the message to be applied, instructionType is the instructionType that appeared in the ActorActivity. actor - the current up to date “actor” instance in memory.
This function should return the modified actor.
Pay attention - The execution of this function should be synchronous.

This function is called as by the base entity before a validateCondition is called. It is meant to get the most current snapshot of this actor so that the validation will be performed correctly. It should handle all instruction types that can occur.

Here is an example of an Inventory model based on BaseActorEntity:

module.exports = function(Inventory) {

    Inventory.prototype.atomicTypes = ['DEBIT'];

    Inventory.prototype.nonAtomicTypes = ['CREDIT'];

    Inventory.prototype.associatedModels = ['InventoryTransaction'];

    Inventory.prototype.validateCondition = function(stateObj, activity) {
        if (activity.instructionType === 'DEBIT') {
            return stateObj.quantity >= activity.payload.value;
        }
    };

    Inventory.prototype.atomicInstructions = function(stateObj, activity) {
        if (activity.instructionType === 'DEBIT') {
            stateObj.quantity = stateObj.quantity - activity.payload.value;
            return stateObj;
        }
    };

    Inventory.prototype.nonAtomicInstructions = function(stateObj, activity) {
        if (activity.instructionType === 'CREDIT') {
            stateObj.quantity = stateObj.quantity + activity.payload.value;
            return stateObj;
        }
    };

    Inventory.prototype.processPendingMessage = function(message, stateObj) {
        if (message.instructionType === 'CREDIT') {
            stateObj.quantity +=  message.payload.value;
        } else if (message.instructionType === 'DEBIT') {
            stateObj.quantity -=  message.payload.value;
        }
        return stateObj;
    };
};

How to Extend BaseJournalEntity

Create the class via the .json file in common/models/framework/

{
  "name": "InventoryTransaction",
  "base": "BaseJournalEntity",
  "idInjection": true,
  "properties": {
  },
  "validations": [],
  "relations": {},
  "acls": [],
  "methods": {}
}

Also, You will have to define the following function:
performBusinessValidations(cb) which executes all business validations.
You will find out more about business validations later on in this guide.
For now, add the trivial implementation of performBusinessValidations(cb) in the .js file under common/models/framework/

module.exports = function(InventoryTransaction) {
    InventoryTransaction.prototype.performBusinessValidations = function(cb) {
        cb();
    };
};

Example - Inventory Debits

In the following example we will create and execute a transaction that has 2 devit activities from separate invetory accounts and have two credits as nonatomic activities.

Step 1 - Create inventory accounts

In the explorer page (localhost:3000/explorer), post the following to the Inventory class:

[{"id":"666", "stateObj":{"quantity":0}},{"id":"777", "stateObj":{"quantity":0}},{"id":"888", "stateObj":{"quantity":0}},{"id":"999", "stateObj":{"quantity":0}}]

Step 2 - Add inventory

Post the follow to InventoryTransaction

{
    "nonAtomicActivitiesList": [
      {
        "entityId": "666",
        "payload": {"value": 1000},
        "modelName": "Inventory",
        "isProcessed": false,
        "instructionType": "CREDIT"
      },
      {
        "entityId": "777",
        "payload": {"value": 1000},
        "modelName": "Inventory",
        "isProcessed": false,
        "instructionType": "CREDIT"
      },
      {
        "entityId": "888",
        "payload": {"value": 1000},
        "modelName": "Inventory",
        "isProcessed": false,
        "instructionType": "CREDIT"
      },
      {
        "entityId": "999",
        "payload": {"value": 1000},
        "modelName": "Inventory",
        "isProcessed": false,
        "instructionType": "CREDIT"
      }
    ]
}

Now both inventories have a positive balance of 1000.

Step 3 - Create (and Post) the Transaction

The transaction will have two atomic activities, a debit of 100 from each of two inventories. After that the nonatomic activity will add 100 to the other two inventories.

Post the following to InventoryTransaction

{
    "atomicActivitiesList": [
      {
        "entityId": "666",
        "payload": {"value": 100},
        "modelName": "Inventory",
        "isProcessed": false,
        "instructionType": "DEBIT"
      },
      {
        "entityId": "999",
        "payload": {"value": 100},
        "modelName": "Inventory",
        "isProcessed": false,
        "instructionType": "DEBIT"
      }
    ],
    "nonAtomicActivitiesList": [
      {
        "entityId": "777",
        "payload": {"value": 100},
        "modelName": "Inventory",
        "isProcessed": false,
        "instructionType": "CREDIT"
      },
      {
        "entityId": "888",
        "payload": {"value": 100},
        "modelName": "Inventory",
        "isProcessed": false,
        "instructionType": "CREDIT"
      }
    ]
}

Step 4 - Produce a Validation Error

By trying to debit a very large amount from one account, a validation error is produced and the whole transaction is aborted.

{
    "atomicActivitiesList": [
      {
        "entityId": "666",
        "payload": {"value": 100},
        "modelName": "Inventory",
        "isProcessed": false,
        "instructionType": "DEBIT"
      },
      {
        "entityId": "999",
        "payload" : {"value": 100000},
        "modelName": "Inventory",
        "isProcessed": false,
        "instructionType": "DEBIT"
      }
    ],
    "nonAtomicActivitiesList": [
      {
        "entityId": "777",
        "payload": {"value": 100},
        "modelName": "Inventory",
        "isProcessed": false,
        "instructionType": "CREDIT"
      },
      {
        "entityId": "888",
        "payload": {"value": 100},
        "modelName": "Inventory",
        "isProcessed": false,
        "instructionType": "CREDIT"
      }
    ]
}

How to implement Business Validations

Business validations are part of the BaseJournalEntity extended model.

You will have to define the following function:
performBusinessValidations(options, cb) which executes all business validations.
options - mandatory options object, can be used to perform database calls (find, update, etc). cb - the callback that should be called when all business validations (synchronous or asynchronous) are finished executing.
If an error occured during the business validations - call cb(err).
If all business validations have passed and you want to proceed with performing the activities - call cb().

Pay attention - if the business validations entail code that is being executed asynchronously, then cb should be called in the callback of the last asynchronus execution.

Several examples for business validations

Synchronous execution:

module.exports = function(InventoryTransaction) {
    InventoryTransaction.prototype.performBusinessValidations = function(options, cb) {
        //synchronous code execution
        var count = 100000000;
        while (count > 0) {
            count--;
        }
        if(count === 0) {
            cb();
        } else {
            cb(new Error('Business validation 01 has occured.'));
        }
    };
};

Asynchronous execution:

module.exports = function(InventoryTransaction) {
    InventoryTransaction.prototype.performBusinessValidations = function(options, cb) {
        //asynchronous code execution
        var xmlHttp = new XMLHttpRequest();
        xmlHttp.onreadystatechange = function() { 
            if (xmlHttp.readyState == 4 && xmlHttp.status == 200) {
                //some more code. if it is asynchronous, the cb() should be called when that asynchronous operation callback is finishing execution.
                cb();
            } else {
                cb(new Error(xmlHttp.responseText));
            }
        }
        xmlHttp.open("GET", 'https://www.google.co.il', true); // true for asynchronous 
        xmlHttp.send(null);
    };
};

The asynchronous code can be replaced with any I/O operations (for example, querying data on the actor and then querying some other data from the DB).
The only thing that you have to make sure is that the cb is called on the callback of the last asynchronous execution.

Retrieve Actor Instances from Memory

After performing a transaction, there is a small period of time in which the updated instance of an actor is in memory but not yet saved in DB.

You might want to retrieve, at that period of time, the actor’s instance.
In other words, to get the instance from memory instead of DB.

In order to get an actor instance from memory, you need to use the REST api for GET /pluralModelName and add a “where” query with the “id” of the actor instance you wish to retrieve.

For the “Inventory” example above, after you post a transaction you need to:

  1. Use the REST api for GET /Inventories.
  2. Add the filter: {“where”:{“id”:inventory_id}} to the request.
    The result will be the inventory with inventory_id from the memory.

Limitations and known issues:

  • If there is no “id” property to the “where” object in the query, the query will proceed to the DB.
  • If the actor has relations other than the “State” model, the REST api for GET /BaseActorEntities/{id}/relationName will result with an error.
    For the “State” relation it would work.

Configurations

Before you start using oeCloud Framework’s atomicity feature there are some configurations that need to be made according to your use case.
The following system parameters will be intialized with default values unless defined in server/config.json file:

  • memoryPoolSize - The size of the lru cache of actors in memory, this parameter allows you to define the maximum amount of actors that will reside in memory.
    Setting this parameter too low will result in “thrashing” under load(loading actors from db to memory and back all the time).
    We suggest monitoring the logs in order to identify this problem (logs from memory-pool module that say “removing actor X from LRU”)

Summary

We have seen how we can create a transaction, add business validations to it, and have everything executed.