⚙️ Valstorm Notification System
The Valstorm notification system allows both the platform and its customers to send notifications across multiple devices. Currently, it supports web app and push notifications to mobile phones. Future channels being considered include email, text, and Slack.
🏗️ Understanding the Object Model
The notification system is broken down into three key objects, each serving a specific purpose.
Notification
This object holds the actual data to be sent, along with information on the recipient, channel, and read status. Notifications are only saved if their settings specify it. If not saved, they act as real-time web socket data updates.
class Notification(BetterBaseModel):
id: str = str(uuid4())
created_date: AwareDatetime = Field(default_factory=datetime.utcnow)
modified_date: AwareDatetime = Field(default_factory=datetime.utcnow)
created_by: SystemLookup | UserLookup | Lookup
modified_by: SystemLookup | UserLookup | Lookup
name: str
channel: str
type: str
data: dict
read: bool = False
user: UserLookup
object: Optional[Lookup] = None
record_id: Optional[str] = None
body: Optional[str] = None
title: Optional[str] = None
notification_setting: Optional[Lookup] = None
notify: bool = True
save: bool = True
Notification Setting
This object controls how a notification will function. It specifies the users or groups to be notified, the delivery channels, and can generate notification subscribers for future updates.
class NotificationSetting(BetterBaseModel):
id: str = str(uuid4())
created_date: AwareDatetime = Field(default_factory=datetime.utcnow)
modified_date: AwareDatetime = Field(default_factory=datetime.utcnow)
created_by: SystemLookup | UserLookup | Lookup
modified_by: SystemLookup | UserLookup | Lookup
name: str # The name of the notification for administrative purposes
channel: str # The channel to send the notification on, typically your org id
type: str # This should be unique
data: dict # The data to be sent
users: Optional[list] = [] # The users to send the notification to or subscribers
groups: Optional[list] = [] # The users to send the notification to or subscribers
create_subscribers: bool = False # whether to create subscribers
save: bool = False # whether to save the notification to the database
notify: bool = True # whether to send the notification
record_alerts: bool = False # whether notifications and subscribers are related to a record
object: Optional[dict] = None # The object to send the notification to
push_to_mobile: bool = False # whether to send the notification to mobile
all_users: bool = False
add_record_owner: bool = False # whether to add the record owner to the notification
add_owner_manager: bool = False # whether to add the record owner's manager to the notification
display_template: Optional[str] = None # The template to use for displaying the notification in the UI
title_template: Optional[str] = None # The template to use for the title of the notification
action_url: Optional[str] = None # The url to open when the notification is clicked
parent_schema_api_name: Optional[str] = None # The parent schema api name if this is a related object
child_collection_key: Optional[str] = None # The child collection key if this is a related object
icon: Optional[str] = None # The icon to use for the notification
notify_sender: bool = False # whether to notify the sender of the notification
sender: Optional[Lookup] = None # The sender of the notification, typically system or current user
subscriber_creation_function: Optional[Lookup] = None
subscriber_creation_automation: Optional[Lookup] = None
Notification Subscriber
This object determines how notifications work for an individual user. By default, it inherits settings from a Notification Setting but can be customized. This is useful for fine-tuning record-level alerts, allowing users to opt in or out of specific notifications.
class NotificationSubscriber(BetterBaseModel):
id: str = str(uuid4())
created_date: AwareDatetime = Field(default_factory=datetime.utcnow)
modified_date: AwareDatetime = Field(default_factory=datetime.utcnow)
created_by: SystemLookup | UserLookup | Lookup
modified_by: SystemLookup | UserLookup | Lookup
name: str
user: UserLookup
active: bool = True
notification_setting: Lookup
record_id: Optional[str] = None
object: Optional[Lookup] = None
Notification System Object Relationships
This chart shows how a Notification Setting acts as the central hub, creating both Notifications and Notification Subscribers.
This second chart provides a more detailed look, including the relationships with users and specific records, based on the record_id and user fields you've provided.
🚀 How to Use the System
- Create a notification setting that specifies who you want to alert and how.
- Determine your sending strategy there are a few ways to trigger notifications. See the
How to Send NotificationsSection
- (Optional) If your notification setting is configured to create subscribers, you can manage these subscribers to customize who receives notifications for specific records via a few ways detailed in the subscriber management send_notifications_handler
How to Send Notifications
Your options for sending notifications include:
- Directly calling the
send_notifications_handlerfunction within your code, passing in the relevant notification setting and data. - Using record triggers to automatically send notifications when certain events occur on specific records.
- Using scheduled triggers to send notifications at predetermined times or intervals.
- Sending a POST request to the endpoint:
https://api.valstorm.com/v1/notifications - Using the
Send Notificationnode in the automation builder to send notifications as part of an automated workflow.
Subscriber management
Declaring the Users on the Notification Setting
Users and groups can be declared directly on the notification setting. These users will receive notifications when the event is triggered.
Using a Function to Assign Subscribers
You can create a function that dynamically assigns subscribers based on the context of the notification. This function should accept three parameters: data, current_user, and notification_setting.
Important Notes
- The function must return a list of user IDs to be added as subscribers.
- Functions should only create new subscribers by checking if the record already has subscribers for the current notification. Otherwise, this will run every time a notification is sent and add a lot of overhead.
- If our team notices excessive function execution, we will reach out to discuss optimizing your implementation. This may include additional costs.
- To optimize your function, leverage this example as a way to check if the record already has subscribers for this notification type. Only create new ones.
def _prepare_initial_data_maps(conversation: dict, notification_setting: NotificationSetting) -> tuple: """ Processes the initial new_data list once to create essential maps and sets. This avoids re-iterating through new_data in later steps. """ contact_ids_to_query = set() # Skip conversations that already have subscribers of this TYPE for subscriber in conversation.get('subscribers', []): if notification_setting.name in subscriber.get('name', ''): return contact_ids_to_query # Process all contacts contacts = conversation.get('contacts') for contact in contacts: contact_id = contact.get('id') contact_ids_to_query.add(contact_id) return contact_ids_to_query
data
Must be the data from the notification. Generally, the data you want to access will be one layer lower at data.data
current_user
The current user running the notification event
notification_setting
The related notification setting that controls this event
Functions can be created at the function list
Here is a sample function that assigns subscribers to a text conversation based on the owner of a Salesforce (Person) Account or Contact record
from valstorm.models import User, NotificationSetting
from valstorm.salesforce import Salesforce, query_salesforce
from valstorm.dependencies import add_log
from valstorm.query import sql_query
def _prepare_initial_data_maps(conversation: dict, notification_setting: NotificationSetting) -> tuple:
"""
Processes the initial new_data list once to create essential maps and sets.
This avoids re-iterating through new_data in later steps.
"""
contact_ids_to_query = set()
# Skip conversations that already have subscribers of this TYPE
for subscriber in conversation.get('subscribers', []):
if notification_setting.name in subscriber.get('name', ''):
return contact_ids_to_query
# Process all contacts
contacts = conversation.get('contacts')
for contact in contacts:
contact_id = contact.get('id')
contact_ids_to_query.add(contact_id)
return contact_ids_to_query
def _fetch_and_map_salesforce_owners(sf: Salesforce, sf_ids_by_type: dict, current_user: User) -> dict:
"""
Queries Salesforce for Accounts and Contacts and returns a map of
Salesforce Record ID -> Owner ID.
"""
sf_owner_map = {}
for object_type, ids in sf_ids_by_type.items():
if not ids:
continue
ids = tuple(ids) if len(ids) > 1 else f"('{ids[0]}')"
sf_query = f"SELECT Id, OwnerId FROM {object_type} WHERE Id IN {ids}"
results = query_salesforce(sf_query, sf)
if results.status_code != 200:
add_log(f"Salesforce query failed for {object_type}: {results.text}", 'error')
continue
results = results.json()
for record in results.get('records', []):
if record.get('Id') and record.get('OwnerId'):
sf_owner_map[record['Id']] = record['OwnerId']
return sf_owner_map
def execute(data: dict, current_user: User, notification_setting: NotificationSetting):
# receives twilio_message or twilio_conversation record
data = data.get('data')
conversation = data.get('conversation')
# this is a twilio_conversation record
if not conversation:
conversation = data
contact_ids = _prepare_initial_data_maps(conversation, notification_setting)
# this is a twilio_message record
else:
conversation_results = sql_query(
query=f"SELECT id, contacts, subscribers FROM twilio_conversation WHERE id = {conversation.get('id')}",
current_user=current_user,
bypass_cache=True,
)
conversations = conversation_results.get('records', [])
conversation = conversations[0] if conversations else None
contact_ids = _prepare_initial_data_maps(conversations[0], notification_setting)
if not conversation:
return []
contacts_result = sql_query(
query=f"SELECT id, salesforce_id FROM contact WHERE id IN {tuple(contact_ids)}",
current_user=current_user,
bypass_cache=True,
)
sf_ids_by_type = {'Account': [], 'Contact': []}
sf_id_to_contact_id_map = {}
for contact in contacts_result.get('records', []):
sf_id = contact.get('salesforce_id')
if not sf_id:
continue
sf_id_to_contact_id_map[sf_id] = contact['id']
if sf_id.startswith('001'):
sf_ids_by_type['Account'].append(sf_id)
elif sf_id.startswith('003'):
sf_ids_by_type['Contact'].append(sf_id)
if not sf_ids_by_type['Account'] and not sf_ids_by_type['Contact']:
return []
# 3. Query Salesforce for Owners
sf = Salesforce(current_user)
sf_owner_map = _fetch_and_map_salesforce_owners(sf, sf_ids_by_type, current_user)
owner_sf_ids = set(sf_owner_map.values())
# 4. Fetch Valstorm Users corresponding to the Salesforce Owners
users_result = sql_query(
query=f"SELECT id, name, salesforce_id FROM user WHERE salesforce_id IN {tuple(owner_sf_ids)}",
current_user=current_user,
)
owner_id_to_user_map = {
user['salesforce_id']: user for user in users_result.get('records', [])
}
user_ids = set()
for sf_id, owner_id in sf_owner_map.items():
user = owner_id_to_user_map.get(owner_id)
user_ids.add(user['id'])
return user_ids
Here is another more complicated example of a Salesforce fed subscription function
from valstorm.models import User, NotificationSetting
from valstorm.salesforce import Salesforce, query_salesforce
from valstorm.dependencies import add_log
from valstorm.query import sql_query
# Language-specific user groups
ENGLISH_USERS = [{"id": "id-one", "name": "User One"}]
SPANISH_USERS = [{"id": "id-two", "name": "User Two"}]
def _format_in_clause(ids: set | list) -> str:
"""
Correctly formats a list or set of IDs for a SQL IN clause.
Handles single-item lists without adding an invalid trailing comma.
"""
if not ids:
return "('')" # Return a valid empty clause to prevent errors
# Convert all items to strings and wrap them in single quotes
quoted_ids = [f"'{str(id_val)}'" for id_val in ids]
# Join them with commas and wrap the result in parentheses
return f"({','.join(quoted_ids)})"
def _prepare_initial_data_maps(conversation: dict, notification_setting: NotificationSetting) -> tuple:
"""
Processes the initial new_data list once to create essential maps and sets.
This avoids re-iterating through new_data in later steps.
"""
contact_ids_to_query = set()
# Skip conversations that already have subscribers of this TYPE
for subscriber in conversation.get('subscribers', []):
if notification_setting.name in subscriber.get('name', ''):
return contact_ids_to_query
# Process all contacts
contacts = conversation.get('contacts')
for contact in contacts:
contact_id = contact.get('id')
contact_ids_to_query.add(contact_id)
return contact_ids_to_query
def _fetch_and_map_salesforce_records(sf: Salesforce, sf_ids_by_type: dict) -> dict:
"""
Queries Salesforce for Accounts and Contacts and returns a map of
Salesforce Record ID -> Full Record Data.
"""
sf_record_map = {}
for object_type, ids in sf_ids_by_type.items():
if not ids:
continue
# Format IDs for the SOQL IN clause
ids_in_clause = _format_in_clause(ids)
# Define the query with all necessary fields
sf_query = f"SELECT Id, OwnerId, Intake_Completed_By__c, Primary_Language__c FROM {object_type} WHERE Id IN {ids_in_clause}"
results = query_salesforce(sf_query, sf)
if results.status_code != 200:
add_log(f"Salesforce query failed for {object_type}: {results.text}", 'error')
continue
# Map each Salesforce ID to its full record data
for record in results.json().get('records', []):
if record.get('Id'):
sf_record_map[record['Id']] = record
return sf_record_map
def execute(data: dict, current_user: User, notification_setting: NotificationSetting):
# receives twilio_message or twilio_conversation record
data = data.get('data')
conversation = data.get('conversation')
# this is a twilio_conversation record
if not conversation:
conversation = data
contact_ids = _prepare_initial_data_maps(conversation, notification_setting)
# this is a twilio_message record
else:
conversation_results = sql_query(
query=f"SELECT id, contacts, subscribers FROM twilio_conversation WHERE id = {conversation.get('id')}",
current_user=current_user,
bypass_cache=True,
)
conversations = conversation_results.get('records', [])
conversation = conversations[0] if conversations else None
contact_ids = _prepare_initial_data_maps(conversations[0], notification_setting)
if not conversation:
return []
contacts_in_clause = _format_in_clause(contact_ids)
contacts_result = sql_query(
query=f"SELECT id, salesforce_id FROM contact WHERE id IN {contacts_in_clause}",
current_user=current_user,
bypass_cache=True,
)
sf_ids_by_type = {'Account': [], 'Contact': []}
sf_id_to_contact_id_map = {}
for contact in contacts_result.get('records', []):
sf_id = contact.get('salesforce_id')
if not sf_id:
continue
sf_id_to_contact_id_map[sf_id] = contact['id']
if sf_id.startswith('001'):
sf_ids_by_type['Account'].append(sf_id)
elif sf_id.startswith('003'):
sf_ids_by_type['Contact'].append(sf_id)
if not sf_ids_by_type['Account'] and not sf_ids_by_type['Contact']:
return []
# 3. Query Salesforce for Owners
sf = Salesforce(current_user)
sf_record_map = _fetch_and_map_salesforce_records(sf, sf_ids_by_type)
# --- 4. Get all potential Valstorm users in one query ---
all_sf_user_ids = set()
for record_data in sf_record_map.values():
if record_data.get('OwnerId'): all_sf_user_ids.add(record_data['OwnerId'])
if record_data.get('Intake_Completed_By__c'): all_sf_user_ids.add(record_data['Intake_Completed_By__c'])
if not all_sf_user_ids:
users_result = {'records': []}
else:
users_in_clause = _format_in_clause(all_sf_user_ids)
users_result = sql_query(
query=f"SELECT id, name, salesforce_id FROM user WHERE salesforce_id IN {users_in_clause}",
current_user=current_user,
)
sf_id_to_user_map = {user['salesforce_id']: user for user in users_result.get('records', [])}
# --- 5. Final Logic: Assemble the User IDs ---
user_ids_to_return = set()
for sf_id, record_data in sf_record_map.items():
# 1. Get the Owner
owner_user = sf_id_to_user_map.get(record_data.get('OwnerId'))
if owner_user:
user_ids_to_return.add(owner_user['id'])
# The following logic only applies to Accounts
if sf_id.startswith('001'):
# 2. Get the Intake User
intake_user = sf_id_to_user_map.get(record_data.get('Intake_Completed_By__c'))
if intake_user:
user_ids_to_return.add(intake_user['id'])
# 3. Get the Language-specific Users
language_users = SPANISH_USERS if record_data.get('Primary_Language__c') == 'Spanish' else ENGLISH_USERS
for user in language_users:
user_ids_to_return.add(user['id'])
return list(user_ids_to_return)
Using Automations to Assign Subscribers
(Coming Soon) You can also use automations to assign subscribers. This allows for more complex logic and workflows to determine who should receive notifications.
Automations can be create at the automation builder
Here are a couple of Mermaid charts to visualize the relationships between the three core objects in the Valstorm notification system.