sub-title

Also check Orama's Quora and Orama's GitHub
I shall not claim to know so much, but only that I learn new things everyday

Monday, 11 April 2022

ORM Event Listener – a case of Flask/SQLAlchemy with Propagation

Introduction

You have probably already heard of (or even used) before_insert, before_update, after_delete, after_update, etc in some programming language. These are very crucial database events in Object Oriented Programming (OOP), and we should immediately think of the concept of Object Relational Mapping (ORM).

ORM is in every developer’s toolkit as long as you are dealing with data-driven applications. ORM is a technique for querying and manipulating data from a database using an object-oriented paradigm. We can use ORM instead of SQL when developing Object Oriented applications, which is kind of the de facto way of developing applications.

When using ORM, you code in the same language used for development (not SQL) to manipulate the database using CRUD operations, which makes it very convenient. Moreover, the ORM code is database-agnostic, so if you change your database, you do not have to change the ORM code, unlike when you use SQL.


ORM Event Listener

Regarding ORM Event Listener, usually one wants to perform some task following a database action such as before_insert, before_update, after_insert, after_update, after_delete, etc. There are many use-cases of the above, but one particular use-case is that you want to log user actions in a database. E.g. when a user changes something in the database, you want to log the username, time, action (update in this case), old value and new value. Simple, isn't it?

ORMs are usually designed to have functionality that can take care of such event listeners. As we use Flask, the obvious ORM is SQLAlchemy. I can bet you already know about SQLAlchemy if you are a Python developer.

We use the decorator event.listens_for to implement SQLAlchemy event listener. The function in the decorator has parameters mapper, connection and target, where 'target' is the object instance.


Examples of ORM Event Listener

For starters, let’s say you want to send a notification email every time a user account is created
@event.listens_for(User, 'after_insert')
def send_email_notificaton(mapper, connection, target):
      #send email that target.username has been created, assume that the target class has a property username

Another simple example, supposing we want to log delete actions for all tables (models in SQLAlchemy). Assume that all models inherit from a base model called MyBaseModal. Then we can use Propagation to listen to all models.
@event.listens_for(MyBaseModel, "after_delete", propagate=True)
def send_email_notificaton(mapper, connection, target):
	#log the delete action into a log table

Below, I want to show a more complex example for logging all inserts, updates and deletes. In other words, I want to implement event listsners for after_insert, after_update and after_delete. The model for the audit trail, which I am using for logging, has been named AuditTrail. I have tried as much as possible to make the property names self explanatory.

AuditTrail model has the following class definition:
class AuditTrail(db.Model): #do not inherit MyBaseModel
    '''This is a docstring - brief description about the class - accessed using __doc__
    '''
    __tablename__ = 'audit_trail'

    # added with_variant(db.INT() to make it work with sqlite
    id = db.Column(db.BIGINT().with_variant(
        db.INT(), "sqlite"), primary_key=True)
    record_id = db.Column(db.INTEGER())
    date_action = db.Column(db.DateTime, nullable=False, default=datetime.now)
    action_by = db.Column(db.INTEGER())
    action_type = db.Column(db.INTEGER())
    table_name = db.Column(db.String(20))
    field_name = db.Column(db.String(20))
    old_val = db.Column(db.String(10))
    new_val = db.Column(db.String(10))
    field_keys = db.Column(db.String(255))
    field_values_old = db.Column(db.String(255))
    field_values_new = db.Column(db.String(255))
    notes = db.Column(db.String(255))

 
This is a helper function (get_object_changes) for after update, which we shall use later in the  after update event listener below. You may skip it for now and return to it when you encounter the function call.

def get_object_changes(obj):
    """ Given a model instance, returns dict of pending
    changes waiting for database flush/commit.

    e.g. {
        'some_field': {
            'before': *SOME-VALUE*,
            'after': *SOME-VALUE*
        },
        ...
    }
    """
    inspection = inspect(obj)
    changes = {}
    for attr in [i for i in class_mapper(obj.__class__).column_attrs if i.key not in ['date_created', 'last_updated', 'version']]:
        if getattr(inspection.attrs, attr.key).history.has_changes():
            if get_history(obj, attr.key)[2]: #index 2 is the old value
                before = get_history(obj, attr.key)[2].pop()
                if isinstance(before, str) and before.isdecimal():
                    before = int(before)
                after = getattr(obj, attr.key)
                if isinstance(after, str) and after.isdecimal():
                    after = int(after)
                if before != after:
                    if before or after:
                        changes[attr.key] = {'before': before, 'after': after}
    return changes

Listening to after insert event for all models via propagation (simple)
@event.listens_for(MyBaseModel, "after_insert", propagate=True)
def audit_log_insert(mapper, connection, record):
    if current_user and record.__class__.__name__ not in ['User', 'AuditLog'] and app.config['AUDIT_LOG'] == True:
        record_dict = record.__dict__.items()
        field_keys = '; '.join([str(k) for k,v in record_dict]) # if isinstance(v, str)
        field_values_new = '; '.join([str(v) for k,v in record_dict]) # if isinstance(v, str)
        user = db.session.query(User).filter(User.email == current_user.email).first().id
        log = AuditTrail(action_by=user, action_type=0, table_name=record.__class__.__tablename__, record_id=record.id, field_keys=field_keys, field_values_new=field_values_new)
        db.session.add(log)

Listening to after update event for all models via propagation (fairly complex because it calls the function get_object_changes)
@event.listens_for(MyBaseModel, "after_update", propagate=True)
def audit_log_update(mapper, connection, record):
    if current_user and record.__class__.__name__ not in ['User', 'AuditLog'] and app.config['AUDIT_LOG']:
        record_dict = get_object_changes(record) #record.__dict__.items()
        if record_dict: #record_dict has elements
            field_keys = '; '.join([k for k,v in record_dict.items()]) # if isinstance(v, str)
            field_values_old = '; '.join([str(v.get('before')) for k,v in record_dict.items()]) # if isinstance(v, str)
            field_values_new = '; '.join([str(v.get('after')) for k,v in record_dict.items()]) # if isinstance(v, str)
            user = db.session.query(User).filter(User.email == current_user.email).first().id
            log = AuditTrail(action_by=user, action_type=1, table_name=record.__class__.__tablename__, record_id=record.id, field_keys=field_keys, field_values_old=field_values_old, field_values_new=field_values_new)
            db.session.add(log)

Listening to after delete event for all models via propagation (simple)
@event.listens_for(MyBaseModel, "after_delete", propagate=True)
def audit_log_delete(mapper, connection, record):
    if current_user and record.__class__.__name__ not in ['User', 'AuditLog'] and app.config['AUDIT_LOG'] == True:
        record_dict = record.__dict__.items()
        field_keys = '; '.join([str(k) for k,v in record_dict]) # if isinstance(v, str)
        field_values_old = '; '.join([str(v) for k,v in record_dict]) # if isinstance(v, str)
        user = db.session.query(User).filter(User.email == current_user.email).first().id
        log = AuditTrail(action_by=user, action_type=2, table_name=record.__class__.__tablename__, record_id=record.id, field_keys=field_keys, field_values_old=field_values_old)
        db.session.add(log)


As there are very many use cases of Event Listeners, you will find this knowledge very useful when you are developing a data-driven web-applicaion. The underlying theory applies to any programming language or development framework or ORM.

Finally, if there is anything that this post has demonstrated, it is that data structures are foundational in any software or data project. Frankly, without knowing dictionaries and lists in Python (or generally data structures in computer science), one would find it really difficult to comprehend the examples given above. This also answers the question that someone once asked me: "when do we apply data structures in software development and data science?" My answer then was "Every time we are programming". This answer is still a constant.

No comments:

Post a Comment