⚙️ Valstorm Notification System

The Valstorm notification system allows both the platform and its customers to send notifications across multiple devices. Currently, it supports web app and push notifications to mobile phones. Future channels being considered include email, text, and Slack.


🏗️ Understanding the Object Model

The notification system is broken down into three key objects, each serving a specific purpose.

Notification

This object holds the actual data to be sent, along with information on the recipient, channel, and read status. Notifications are only saved if their settings specify it. If not saved, they act as real-time web socket data updates.

class Notification(BetterBaseModel):
    id: str = str(uuid4())
    created_date: AwareDatetime = Field(default_factory=datetime.utcnow)
    modified_date: AwareDatetime = Field(default_factory=datetime.utcnow)
    created_by: SystemLookup | UserLookup | Lookup
    modified_by: SystemLookup | UserLookup | Lookup
    name: str
    channel: str
    type: str
    data: dict
    read: bool = False
    user: UserLookup
    object: Optional[Lookup] = None
    record_id: Optional[str] = None
    body: Optional[str] = None
    title: Optional[str] = None
    notification_setting: Optional[Lookup] = None
    notify: bool = True
    save: bool = True

Notification Setting

This object controls how a notification will function. It specifies the users or groups to be notified, the delivery channels, and can generate notification subscribers for future updates.

class NotificationSetting(BetterBaseModel):
    id: str = str(uuid4())
    created_date: AwareDatetime = Field(default_factory=datetime.utcnow)
    modified_date: AwareDatetime = Field(default_factory=datetime.utcnow)
    created_by: SystemLookup | UserLookup | Lookup
    modified_by: SystemLookup | UserLookup | Lookup
    name: str # The name of the notification for administrative purposes
    channel: str # The channel to send the notification on, typically your org id
    type: str # This should be unique
    data: dict # The data to be sent
    users: Optional[list] = [] # The users to send the notification to or subscribers
    groups: Optional[list] = [] # The users to send the notification to or subscribers
    create_subscribers: bool = False # whether to create subscribers
    save: bool = False # whether to save the notification to the database
    notify: bool = True # whether to send the notification
    record_alerts: bool = False # whether notifications and subscribers are related to a record
    object: Optional[dict] = None # The object to send the notification to
    push_to_mobile: bool = False # whether to send the notification to mobile
    all_users: bool = False
    add_record_owner: bool = False # whether to add the record owner to the notification
    add_owner_manager: bool = False # whether to add the record owner's manager to the notification
    display_template: Optional[str] = None # The template to use for displaying the notification in the UI
    title_template: Optional[str] = None # The template to use for the title of the notification
    action_url: Optional[str] = None # The url to open when the notification is clicked
    parent_schema_api_name: Optional[str] = None # The parent schema api name if this is a related object
    child_collection_key: Optional[str] = None # The child collection key if this is a related object
    icon: Optional[str] = None # The icon to use for the notification
    notify_sender: bool = False # whether to notify the sender of the notification
    sender: Optional[Lookup] = None # The sender of the notification, typically system or current user
    subscriber_creation_function: Optional[Lookup] = None
    subscriber_creation_automation: Optional[Lookup] = None

Notification Subscriber

This object determines how notifications work for an individual user. By default, it inherits settings from a Notification Setting but can be customized. This is useful for fine-tuning record-level alerts, allowing users to opt in or out of specific notifications.

class NotificationSubscriber(BetterBaseModel):
    id: str = str(uuid4())
    created_date: AwareDatetime = Field(default_factory=datetime.utcnow)
    modified_date: AwareDatetime = Field(default_factory=datetime.utcnow)
    created_by: SystemLookup | UserLookup | Lookup
    modified_by: SystemLookup | UserLookup | Lookup
    name: str
    user: UserLookup
    active: bool = True
    notification_setting: Lookup
    record_id: Optional[str] = None
    object: Optional[Lookup] = None

Notification System Object Relationships

This chart shows how a Notification Setting acts as the central hub, creating both Notifications and Notification Subscribers.

graph TD subgraph Core Objects A[Notification Setting] --> B[Notification]; A --> C[Notification Subscriber]; end B -- "is generated by" --> A; C -- "is created from" --> A;

This second chart provides a more detailed look, including the relationships with users and specific records, based on the record_id and user fields you've provided.

graph TD subgraph How it Works NS[Notification Setting] --> N[Notification]; N --> U[User]; NS --> NSB[Notification Subscriber]; NSB --> U; N -- "relates to" --> R[Record]; NSB -- "relates to" --> R; end

🚀 How to Use the System

  1. Create a notification setting that specifies who you want to alert and how.
  2. Determine your sending strategy there are a few ways to trigger notifications. See the How to Send Notifications Section
  1. (Optional) If your notification setting is configured to create subscribers, you can manage these subscribers to customize who receives notifications for specific records via a few ways detailed in the subscriber management send_notifications_handler

How to Send Notifications

Your options for sending notifications include:

  1. Directly calling the send_notifications_handler function within your code, passing in the relevant notification setting and data.
  2. Using record triggers to automatically send notifications when certain events occur on specific records.
  3. Using scheduled triggers to send notifications at predetermined times or intervals.
  4. Sending a POST request to the endpoint: https://api.valstorm.com/v1/notifications
  5. Using the Send Notification node in the automation builder to send notifications as part of an automated workflow.

Subscriber management

Declaring the Users on the Notification Setting

Users and groups can be declared directly on the notification setting. These users will receive notifications when the event is triggered.

Using a Function to Assign Subscribers

You can create a function that dynamically assigns subscribers based on the context of the notification. This function should accept three parameters: data, current_user, and notification_setting.

Important Notes

  1. The function must return a list of user IDs to be added as subscribers.
  2. Functions should only create new subscribers by checking if the record already has subscribers for the current notification. Otherwise, this will run every time a notification is sent and add a lot of overhead.
    • If our team notices excessive function execution, we will reach out to discuss optimizing your implementation. This may include additional costs.
    • To optimize your function, leverage this example as a way to check if the record already has subscribers for this notification type. Only create new ones.
      def _prepare_initial_data_maps(conversation: dict, notification_setting: NotificationSetting) -> tuple:
      """
      Processes the initial new_data list once to create essential maps and sets.
      This avoids re-iterating through new_data in later steps.
      """
      contact_ids_to_query = set()
      
      # Skip conversations that already have subscribers of this TYPE
      for subscriber in conversation.get('subscribers', []):
          if notification_setting.name in subscriber.get('name', ''):
              return contact_ids_to_query
      
      # Process all contacts
      contacts = conversation.get('contacts')
      for contact in contacts:
          contact_id = contact.get('id')
          contact_ids_to_query.add(contact_id)
                  
      return contact_ids_to_query
      

data Must be the data from the notification. Generally, the data you want to access will be one layer lower at data.data

current_user The current user running the notification event

notification_setting The related notification setting that controls this event

Functions can be created at the function list

Here is a sample function that assigns subscribers to a text conversation based on the owner of a Salesforce (Person) Account or Contact record

from valstorm.models import User, NotificationSetting
from valstorm.salesforce import Salesforce, query_salesforce
from valstorm.dependencies import add_log
from valstorm.query import sql_query


def _prepare_initial_data_maps(conversation: dict, notification_setting: NotificationSetting) -> tuple:
    """
    Processes the initial new_data list once to create essential maps and sets.
    This avoids re-iterating through new_data in later steps.
    """
    contact_ids_to_query = set()
    
    # Skip conversations that already have subscribers of this TYPE
    for subscriber in conversation.get('subscribers', []):
        if notification_setting.name in subscriber.get('name', ''):
            return contact_ids_to_query

    # Process all contacts
    contacts = conversation.get('contacts')
    for contact in contacts:
        contact_id = contact.get('id')
        contact_ids_to_query.add(contact_id)
                
    return contact_ids_to_query

def _fetch_and_map_salesforce_owners(sf: Salesforce, sf_ids_by_type: dict, current_user: User) -> dict:
    """
    Queries Salesforce for Accounts and Contacts and returns a map of 
    Salesforce Record ID -> Owner ID.
    """
    sf_owner_map = {}
    for object_type, ids in sf_ids_by_type.items():
        if not ids:
            continue
        ids = tuple(ids) if len(ids) > 1 else f"('{ids[0]}')"
        sf_query = f"SELECT Id, OwnerId FROM {object_type} WHERE Id IN {ids}"
        results = query_salesforce(sf_query, sf)
        if results.status_code != 200:
            add_log(f"Salesforce query failed for {object_type}: {results.text}", 'error')
            continue
        results = results.json()

        for record in results.get('records', []):
            if record.get('Id') and record.get('OwnerId'):
                sf_owner_map[record['Id']] = record['OwnerId']
                
    return sf_owner_map

def execute(data: dict, current_user: User, notification_setting: NotificationSetting):
    # receives twilio_message or twilio_conversation record
    data = data.get('data')
    conversation = data.get('conversation')
    # this is a twilio_conversation record
    if not conversation:
        conversation = data
        contact_ids = _prepare_initial_data_maps(conversation, notification_setting)
    # this is a twilio_message record
    else:
        conversation_results = sql_query(
            query=f"SELECT id, contacts, subscribers FROM twilio_conversation WHERE id = {conversation.get('id')}",
            current_user=current_user,
            bypass_cache=True,
        )
        conversations = conversation_results.get('records', [])
        conversation = conversations[0] if conversations else None
        contact_ids = _prepare_initial_data_maps(conversations[0], notification_setting)
    if not conversation:
        return []
    
    contacts_result = sql_query(
        query=f"SELECT id, salesforce_id FROM contact WHERE id IN {tuple(contact_ids)}",
        current_user=current_user,
        bypass_cache=True,
    )
    sf_ids_by_type = {'Account': [], 'Contact': []}
    sf_id_to_contact_id_map = {}
    for contact in contacts_result.get('records', []):
        sf_id = contact.get('salesforce_id')
        if not sf_id:
            continue
        
        sf_id_to_contact_id_map[sf_id] = contact['id']
        if sf_id.startswith('001'):
            sf_ids_by_type['Account'].append(sf_id)
        elif sf_id.startswith('003'):
            sf_ids_by_type['Contact'].append(sf_id)
    if not sf_ids_by_type['Account'] and not sf_ids_by_type['Contact']:
        return []
    # 3. Query Salesforce for Owners
    sf = Salesforce(current_user)
    sf_owner_map = _fetch_and_map_salesforce_owners(sf, sf_ids_by_type, current_user)
    owner_sf_ids = set(sf_owner_map.values())
    
    # 4. Fetch Valstorm Users corresponding to the Salesforce Owners
    users_result = sql_query(
        query=f"SELECT id, name, salesforce_id FROM user WHERE salesforce_id IN {tuple(owner_sf_ids)}",
        current_user=current_user,
    )
    owner_id_to_user_map = {
        user['salesforce_id']: user for user in users_result.get('records', [])
    }
    user_ids = set()

    for sf_id, owner_id in sf_owner_map.items():
        user = owner_id_to_user_map.get(owner_id)
        user_ids.add(user['id'])
    return user_ids

Here is another more complicated example of a Salesforce fed subscription function

from valstorm.models import User, NotificationSetting
from valstorm.salesforce import Salesforce, query_salesforce
from valstorm.dependencies import add_log
from valstorm.query import sql_query


# Language-specific user groups
ENGLISH_USERS = [{"id": "id-one", "name": "User One"}]
SPANISH_USERS = [{"id": "id-two", "name": "User Two"}]

def _format_in_clause(ids: set | list) -> str:
    """
    Correctly formats a list or set of IDs for a SQL IN clause.
    Handles single-item lists without adding an invalid trailing comma.
    """
    if not ids:
        return "('')" # Return a valid empty clause to prevent errors
    
    # Convert all items to strings and wrap them in single quotes
    quoted_ids = [f"'{str(id_val)}'" for id_val in ids]
    
    # Join them with commas and wrap the result in parentheses
    return f"({','.join(quoted_ids)})"

def _prepare_initial_data_maps(conversation: dict, notification_setting: NotificationSetting) -> tuple:
    """
    Processes the initial new_data list once to create essential maps and sets.
    This avoids re-iterating through new_data in later steps.
    """
    contact_ids_to_query = set()
    
    # Skip conversations that already have subscribers of this TYPE
    for subscriber in conversation.get('subscribers', []):
        if notification_setting.name in subscriber.get('name', ''):
            return contact_ids_to_query

    # Process all contacts
    contacts = conversation.get('contacts')
    for contact in contacts:
        contact_id = contact.get('id')
        contact_ids_to_query.add(contact_id)
                
    return contact_ids_to_query

def _fetch_and_map_salesforce_records(sf: Salesforce, sf_ids_by_type: dict) -> dict:
    """
    Queries Salesforce for Accounts and Contacts and returns a map of 
    Salesforce Record ID -> Full Record Data.
    """
    sf_record_map = {}
    for object_type, ids in sf_ids_by_type.items():
        if not ids:
            continue
        
        # Format IDs for the SOQL IN clause
        ids_in_clause = _format_in_clause(ids)
        
        # Define the query with all necessary fields
        sf_query = f"SELECT Id, OwnerId, Intake_Completed_By__c, Primary_Language__c FROM {object_type} WHERE Id IN {ids_in_clause}"
        
        results = query_salesforce(sf_query, sf)
        if results.status_code != 200:
            add_log(f"Salesforce query failed for {object_type}: {results.text}", 'error')
            continue
        
        # Map each Salesforce ID to its full record data
        for record in results.json().get('records', []):
            if record.get('Id'):
                sf_record_map[record['Id']] = record
                
    return sf_record_map

def execute(data: dict, current_user: User, notification_setting: NotificationSetting):
    # receives twilio_message or twilio_conversation record
    data = data.get('data')
    conversation = data.get('conversation')
    # this is a twilio_conversation record
    if not conversation:
        conversation = data
        contact_ids = _prepare_initial_data_maps(conversation, notification_setting)
    # this is a twilio_message record
    else:
        conversation_results = sql_query(
            query=f"SELECT id, contacts, subscribers FROM twilio_conversation WHERE id = {conversation.get('id')}",
            current_user=current_user,
            bypass_cache=True,
        )
        conversations = conversation_results.get('records', [])
        conversation = conversations[0] if conversations else None
        contact_ids = _prepare_initial_data_maps(conversations[0], notification_setting)
    if not conversation:
        return []

    contacts_in_clause = _format_in_clause(contact_ids)
    contacts_result = sql_query(
        query=f"SELECT id, salesforce_id FROM contact WHERE id IN {contacts_in_clause}",
        current_user=current_user,
        bypass_cache=True,
    )
    sf_ids_by_type = {'Account': [], 'Contact': []}
    sf_id_to_contact_id_map = {}
    for contact in contacts_result.get('records', []):
        sf_id = contact.get('salesforce_id')
        if not sf_id:
            continue
        
        sf_id_to_contact_id_map[sf_id] = contact['id']
        if sf_id.startswith('001'):
            sf_ids_by_type['Account'].append(sf_id)
        elif sf_id.startswith('003'):
            sf_ids_by_type['Contact'].append(sf_id)
    if not sf_ids_by_type['Account'] and not sf_ids_by_type['Contact']:
        return []
    # 3. Query Salesforce for Owners
    sf = Salesforce(current_user)
    sf_record_map = _fetch_and_map_salesforce_records(sf, sf_ids_by_type)

    
    # --- 4. Get all potential Valstorm users in one query ---
    all_sf_user_ids = set()
    for record_data in sf_record_map.values():
        if record_data.get('OwnerId'): all_sf_user_ids.add(record_data['OwnerId'])
        if record_data.get('Intake_Completed_By__c'): all_sf_user_ids.add(record_data['Intake_Completed_By__c'])
    
    if not all_sf_user_ids:
        users_result = {'records': []}
    else:
        users_in_clause = _format_in_clause(all_sf_user_ids)
        users_result = sql_query(
            query=f"SELECT id, name, salesforce_id FROM user WHERE salesforce_id IN {users_in_clause}",
            current_user=current_user,
        )
    
    sf_id_to_user_map = {user['salesforce_id']: user for user in users_result.get('records', [])}
    
    # --- 5. Final Logic: Assemble the User IDs ---
    user_ids_to_return = set()

    for sf_id, record_data in sf_record_map.items():
        # 1. Get the Owner
        owner_user = sf_id_to_user_map.get(record_data.get('OwnerId'))
        if owner_user:
            user_ids_to_return.add(owner_user['id'])

        # The following logic only applies to Accounts
        if sf_id.startswith('001'):
            # 2. Get the Intake User
            intake_user = sf_id_to_user_map.get(record_data.get('Intake_Completed_By__c'))
            if intake_user:
                user_ids_to_return.add(intake_user['id'])
            
            # 3. Get the Language-specific Users
            language_users = SPANISH_USERS if record_data.get('Primary_Language__c') == 'Spanish' else ENGLISH_USERS
            for user in language_users:
                user_ids_to_return.add(user['id'])

    return list(user_ids_to_return)

Using Automations to Assign Subscribers

(Coming Soon) You can also use automations to assign subscribers. This allows for more complex logic and workflows to determine who should receive notifications.

Automations can be create at the automation builder

Here are a couple of Mermaid charts to visualize the relationships between the three core objects in the Valstorm notification system.