import MermaidChart from '../../../components/MermaidChart';

⚙️ Valstorm Notification System

The Valstorm notification system allows both the platform and its customers to send notifications across multiple devices. Currently, it supports web app and push notifications to mobile phones. Future channels being considered include email, text, and Slack.


🏗️ Understanding the Object Model

The notification system is broken down into three key objects, each serving a specific purpose.

Notification

This object holds the actual data to be sent, along with information on the recipient, channel, and read status. Notifications are only saved if their settings specify it. If not saved, they act as real-time web socket data updates.

class Notification(BetterBaseModel): id: str = str(uuid4()) created_date: AwareDatetime = Field(default_factory=datetime.utcnow) modified_date: AwareDatetime = Field(default_factory=datetime.utcnow) created_by: SystemLookup | UserLookup | Lookup modified_by: SystemLookup | UserLookup | Lookup name: str channel: str type: str data: dict read: bool = False user: UserLookup object: Optional[Lookup] = None record_id: Optional[str] = None body: Optional[str] = None title: Optional[str] = None notification_setting: Optional[Lookup] = None notify: bool = True save: bool = True

Notification Setting

This object controls how a notification will function. It specifies the users or groups to be notified, the delivery channels, and can generate notification subscribers for future updates.

class NotificationSetting(BetterBaseModel): id: str = str(uuid4()) created_date: AwareDatetime = Field(default_factory=datetime.utcnow) modified_date: AwareDatetime = Field(default_factory=datetime.utcnow) created_by: SystemLookup | UserLookup | Lookup modified_by: SystemLookup | UserLookup | Lookup name: str # The name of the notification for administrative purposes channel: str # The channel to send the notification on, typically your org id type: str # This should be unique data: dict # The data to be sent users: Optional[list] = [] # The users to send the notification to or subscribers groups: Optional[list] = [] # The users to send the notification to or subscribers create_subscribers: bool = False # whether to create subscribers save: bool = False # whether to save the notification to the database notify: bool = True # whether to send the notification record_alerts: bool = False # whether notifications and subscribers are related to a record object: Optional[dict] = None # The object to send the notification to push_to_mobile: bool = False # whether to send the notification to mobile all_users: bool = False add_record_owner: bool = False # whether to add the record owner to the notification add_owner_manager: bool = False # whether to add the record owner's manager to the notification display_template: Optional[str] = None # The template to use for displaying the notification in the UI title_template: Optional[str] = None # The template to use for the title of the notification action_url: Optional[str] = None # The url to open when the notification is clicked parent_schema_api_name: Optional[str] = None # The parent schema api name if this is a related object child_collection_key: Optional[str] = None # The child collection key if this is a related object icon: Optional[str] = None # The icon to use for the notification notify_sender: bool = False # whether to notify the sender of the notification sender: Optional[Lookup] = None # The sender of the notification, typically system or current user subscriber_creation_function: Optional[Lookup] = None subscriber_creation_automation: Optional[Lookup] = None

Notification Subscriber

This object determines how notifications work for an individual user. By default, it inherits settings from a Notification Setting but can be customized. This is useful for fine-tuning record-level alerts, allowing users to opt in or out of specific notifications.

class NotificationSubscriber(BetterBaseModel): id: str = str(uuid4()) created_date: AwareDatetime = Field(default_factory=datetime.utcnow) modified_date: AwareDatetime = Field(default_factory=datetime.utcnow) created_by: SystemLookup | UserLookup | Lookup modified_by: SystemLookup | UserLookup | Lookup name: str user: UserLookup active: bool = True notification_setting: Lookup record_id: Optional[str] = None object: Optional[Lookup] = None

Notification System Object Relationships

This chart shows how a Notification Setting acts as the central hub, creating both Notifications and Notification Subscribers.

<MermaidChart chart={graph TD subgraph Core Objects A[Notification Setting] --> B[Notification]; A --> C[Notification Subscriber]; end B -- "is generated by" --> A; C -- "is created from" --> A;} />

This second chart provides a more detailed look, including the relationships with users and specific records, based on the record_id and user fields you've provided.

<MermaidChart chart={graph TD subgraph How it Works NS[Notification Setting] --> N[Notification]; N --> U[User]; NS --> NSB[Notification Subscriber]; NSB --> U; N -- "relates to" --> R[Record]; NSB -- "relates to" --> R; end} />


🚀 How to Use the System

  1. Create a notification setting that specifies who you want to alert and how.
  2. Determine your sending strategy there are a few ways to trigger notifications. See the How to Send Notifications Section {/* 2. Trigger sending notifications via the API via the send_notifications_handler method within record triggers, scheduled triggers, or by sending a POST request to this endpoint: https://api.valstorm.com/v1/notifications. */}
  3. (Optional) If your notification setting is configured to create subscribers, you can manage these subscribers to customize who receives notifications for specific records via a few ways detailed in the subscriber management send_notifications_handler

How to Send Notifications

Your options for sending notifications include:

  1. Directly calling the send_notifications_handler function within your code, passing in the relevant notification setting and data.
  2. Using record triggers to automatically send notifications when certain events occur on specific records.
  3. Using scheduled triggers to send notifications at predetermined times or intervals.
  4. Sending a POST request to the endpoint: https://api.valstorm.com/v1/notifications
  5. Using the Send Notification node in the automation builder to send notifications as part of an automated workflow.

Subscriber management

Declaring the Users on the Notification Setting

Users and groups can be declared directly on the notification setting. These users will receive notifications when the event is triggered.

Using a Function to Assign Subscribers

You can create a function that dynamically assigns subscribers based on the context of the notification. This function should accept three parameters: data, current_user, and notification_setting.

Important Notes

  1. The function must return a list of user IDs to be added as subscribers.
  2. Functions should only create new subscribers by checking if the record already has subscribers for the current notification. Otherwise, this will run every time a notification is sent and add a lot of overhead.
    • If our team notices excessive function execution, we will reach out to discuss optimizing your implementation. This may include additional costs.
    • To optimize your function, leverage this example as a way to check if the record already has subscribers for this notification type. Only create new ones.
      def _prepare_initial_data_maps(conversation: dict, notification_setting: NotificationSetting) -> tuple: """ Processes the initial new_data list once to create essential maps and sets. This avoids re-iterating through new_data in later steps. """ contact_ids_to_query = set() # Skip conversations that already have subscribers of this TYPE for subscriber in conversation.get('subscribers', []): if notification_setting.name in subscriber.get('name', ''): return contact_ids_to_query # Process all contacts contacts = conversation.get('contacts') for contact in contacts: contact_id = contact.get('id') contact_ids_to_query.add(contact_id) return contact_ids_to_query

data Must be the data from the notification. Generally, the data you want to access will be one layer lower at data.data

current_user The current user running the notification event

notification_setting The related notification setting that controls this event

Functions can be created at the function list

Here is a sample function that assigns subscribers to a text conversation based on the owner of a Salesforce (Person) Account or Contact record

from valstorm.models import User, NotificationSetting from valstorm.salesforce import Salesforce, query_salesforce from valstorm.dependencies import add_log from valstorm.query import sql_query def _prepare_initial_data_maps(conversation: dict, notification_setting: NotificationSetting) -> tuple: """ Processes the initial new_data list once to create essential maps and sets. This avoids re-iterating through new_data in later steps. """ contact_ids_to_query = set() # Skip conversations that already have subscribers of this TYPE for subscriber in conversation.get('subscribers', []): if notification_setting.name in subscriber.get('name', ''): return contact_ids_to_query # Process all contacts contacts = conversation.get('contacts') for contact in contacts: contact_id = contact.get('id') contact_ids_to_query.add(contact_id) return contact_ids_to_query def _fetch_and_map_salesforce_owners(sf: Salesforce, sf_ids_by_type: dict, current_user: User) -> dict: """ Queries Salesforce for Accounts and Contacts and returns a map of Salesforce Record ID -> Owner ID. """ sf_owner_map = {} for object_type, ids in sf_ids_by_type.items(): if not ids: continue ids = tuple(ids) if len(ids) > 1 else f"('{ids[0]}')" sf_query = f"SELECT Id, OwnerId FROM {object_type} WHERE Id IN {ids}" results = query_salesforce(sf_query, sf) if results.status_code != 200: add_log(f"Salesforce query failed for {object_type}: {results.text}", 'error') continue results = results.json() for record in results.get('records', []): if record.get('Id') and record.get('OwnerId'): sf_owner_map[record['Id']] = record['OwnerId'] return sf_owner_map def execute(data: dict, current_user: User, notification_setting: NotificationSetting): # receives twilio_message or twilio_conversation record data = data.get('data') conversation = data.get('conversation') # this is a twilio_conversation record if not conversation: conversation = data contact_ids = _prepare_initial_data_maps(conversation, notification_setting) # this is a twilio_message record else: conversation_results = sql_query( query=f"SELECT id, contacts, subscribers FROM twilio_conversation WHERE id = {conversation.get('id')}", current_user=current_user, bypass_cache=True, ) conversations = conversation_results.get('records', []) conversation = conversations[0] if conversations else None contact_ids = _prepare_initial_data_maps(conversations[0], notification_setting) if not conversation: return [] contacts_result = sql_query( query=f"SELECT id, salesforce_id FROM contact WHERE id IN {tuple(contact_ids)}", current_user=current_user, bypass_cache=True, ) sf_ids_by_type = {'Account': [], 'Contact': []} sf_id_to_contact_id_map = {} for contact in contacts_result.get('records', []): sf_id = contact.get('salesforce_id') if not sf_id: continue sf_id_to_contact_id_map[sf_id] = contact['id'] if sf_id.startswith('001'): sf_ids_by_type['Account'].append(sf_id) elif sf_id.startswith('003'): sf_ids_by_type['Contact'].append(sf_id) if not sf_ids_by_type['Account'] and not sf_ids_by_type['Contact']: return [] # 3. Query Salesforce for Owners sf = Salesforce(current_user) sf_owner_map = _fetch_and_map_salesforce_owners(sf, sf_ids_by_type, current_user) owner_sf_ids = set(sf_owner_map.values()) # 4. Fetch Valstorm Users corresponding to the Salesforce Owners users_result = sql_query( query=f"SELECT id, name, salesforce_id FROM user WHERE salesforce_id IN {tuple(owner_sf_ids)}", current_user=current_user, ) owner_id_to_user_map = { user['salesforce_id']: user for user in users_result.get('records', []) } user_ids = set() for sf_id, owner_id in sf_owner_map.items(): user = owner_id_to_user_map.get(owner_id) user_ids.add(user['id']) return user_ids

Here is another more complicated example of a Salesforce fed subscription function

from valstorm.models import User, NotificationSetting from valstorm.salesforce import Salesforce, query_salesforce from valstorm.dependencies import add_log from valstorm.query import sql_query # Language-specific user groups ENGLISH_USERS = [{"id": "id-one", "name": "User One"}] SPANISH_USERS = [{"id": "id-two", "name": "User Two"}] def _format_in_clause(ids: set | list) -> str: """ Correctly formats a list or set of IDs for a SQL IN clause. Handles single-item lists without adding an invalid trailing comma. """ if not ids: return "('')" # Return a valid empty clause to prevent errors # Convert all items to strings and wrap them in single quotes quoted_ids = [f"'{str(id_val)}'" for id_val in ids] # Join them with commas and wrap the result in parentheses return f"({','.join(quoted_ids)})" def _prepare_initial_data_maps(conversation: dict, notification_setting: NotificationSetting) -> tuple: """ Processes the initial new_data list once to create essential maps and sets. This avoids re-iterating through new_data in later steps. """ contact_ids_to_query = set() # Skip conversations that already have subscribers of this TYPE for subscriber in conversation.get('subscribers', []): if notification_setting.name in subscriber.get('name', ''): return contact_ids_to_query # Process all contacts contacts = conversation.get('contacts') for contact in contacts: contact_id = contact.get('id') contact_ids_to_query.add(contact_id) return contact_ids_to_query def _fetch_and_map_salesforce_records(sf: Salesforce, sf_ids_by_type: dict) -> dict: """ Queries Salesforce for Accounts and Contacts and returns a map of Salesforce Record ID -> Full Record Data. """ sf_record_map = {} for object_type, ids in sf_ids_by_type.items(): if not ids: continue # Format IDs for the SOQL IN clause ids_in_clause = _format_in_clause(ids) # Define the query with all necessary fields sf_query = f"SELECT Id, OwnerId, Intake_Completed_By__c, Primary_Language__c FROM {object_type} WHERE Id IN {ids_in_clause}" results = query_salesforce(sf_query, sf) if results.status_code != 200: add_log(f"Salesforce query failed for {object_type}: {results.text}", 'error') continue # Map each Salesforce ID to its full record data for record in results.json().get('records', []): if record.get('Id'): sf_record_map[record['Id']] = record return sf_record_map def execute(data: dict, current_user: User, notification_setting: NotificationSetting): # receives twilio_message or twilio_conversation record data = data.get('data') conversation = data.get('conversation') # this is a twilio_conversation record if not conversation: conversation = data contact_ids = _prepare_initial_data_maps(conversation, notification_setting) # this is a twilio_message record else: conversation_results = sql_query( query=f"SELECT id, contacts, subscribers FROM twilio_conversation WHERE id = {conversation.get('id')}", current_user=current_user, bypass_cache=True, ) conversations = conversation_results.get('records', []) conversation = conversations[0] if conversations else None contact_ids = _prepare_initial_data_maps(conversations[0], notification_setting) if not conversation: return [] contacts_in_clause = _format_in_clause(contact_ids) contacts_result = sql_query( query=f"SELECT id, salesforce_id FROM contact WHERE id IN {contacts_in_clause}", current_user=current_user, bypass_cache=True, ) sf_ids_by_type = {'Account': [], 'Contact': []} sf_id_to_contact_id_map = {} for contact in contacts_result.get('records', []): sf_id = contact.get('salesforce_id') if not sf_id: continue sf_id_to_contact_id_map[sf_id] = contact['id'] if sf_id.startswith('001'): sf_ids_by_type['Account'].append(sf_id) elif sf_id.startswith('003'): sf_ids_by_type['Contact'].append(sf_id) if not sf_ids_by_type['Account'] and not sf_ids_by_type['Contact']: return [] # 3. Query Salesforce for Owners sf = Salesforce(current_user) sf_record_map = _fetch_and_map_salesforce_records(sf, sf_ids_by_type) # --- 4. Get all potential Valstorm users in one query --- all_sf_user_ids = set() for record_data in sf_record_map.values(): if record_data.get('OwnerId'): all_sf_user_ids.add(record_data['OwnerId']) if record_data.get('Intake_Completed_By__c'): all_sf_user_ids.add(record_data['Intake_Completed_By__c']) if not all_sf_user_ids: users_result = {'records': []} else: users_in_clause = _format_in_clause(all_sf_user_ids) users_result = sql_query( query=f"SELECT id, name, salesforce_id FROM user WHERE salesforce_id IN {users_in_clause}", current_user=current_user, ) sf_id_to_user_map = {user['salesforce_id']: user for user in users_result.get('records', [])} # --- 5. Final Logic: Assemble the User IDs --- user_ids_to_return = set() for sf_id, record_data in sf_record_map.items(): # 1. Get the Owner owner_user = sf_id_to_user_map.get(record_data.get('OwnerId')) if owner_user: user_ids_to_return.add(owner_user['id']) # The following logic only applies to Accounts if sf_id.startswith('001'): # 2. Get the Intake User intake_user = sf_id_to_user_map.get(record_data.get('Intake_Completed_By__c')) if intake_user: user_ids_to_return.add(intake_user['id']) # 3. Get the Language-specific Users language_users = SPANISH_USERS if record_data.get('Primary_Language__c') == 'Spanish' else ENGLISH_USERS for user in language_users: user_ids_to_return.add(user['id']) return list(user_ids_to_return)

Using Automations to Assign Subscribers

(Coming Soon) You can also use automations to assign subscribers. This allows for more complex logic and workflows to determine who should receive notifications.

Automations can be create at the automation builder

Here are a couple of Mermaid charts to visualize the relationships between the three core objects in the Valstorm notification system.