Notification Specification - NFSandbox/sh_trade_backend GitHub Wiki
All different types of notifications should all be stored in the same table. And JSON could be considered when storing polymorphic content.
Field | Info |
---|---|
notification_id | ID of this notification |
sender_id | Sender id |
receiver_id | Receiver id |
content | JSON content info of this message |
read_time | If this message has been marked read |
created_at | Created time of this notification |
Note that content
field should be able to work with Descriminator
in Pydantic. The content_type
will be used as discriminator flag field.
The first step is to create a list of Pydantic classes to validate content
field of the ORM class.
We should use a list of derived class as the Discriminator Union type.
- TextNotification
- UrlActionNotification
- MarkdownNotification
- ...
All classes used as a validator model of content
should have a unique content_type
string Literal:
class NotificationContentOut(BaseModel):
# should be unique across all notification content out model class.
content_type: Literal["text"] = "text"
category: str = "basic"
...
Currently supported content_type
field:
text
markdown
url_action
For more info about these content type, check out the docstrings of their discriminator class.
The second step is to create a Pydantic class with a discriminator field content
to validate the whole Notification
ORM class.
NotificationContentOutUnion = (
NotificationContentOut
| MarkDownNotificationContentOut
| URLActionNotificationContentOut
| ...
)
class NotificationOut(BaseModel):
notification_id: int
...
content: NotificationContentOutUnion = Field(discriminator="content_type")
/get
/read
/read_all
get_message
send_message
read_message
-
boardcast_message
(Not Implemented)
When using Notification functionalities, the process should not be directly added to providers. Instead, sending message in the endpoint functions (As the same when we need to handle some validity checking)
The NotificationSender
Python class allow us to add callbacks to different "signal"s.
The callback should be provided with a key, or if None
, use the function.__name__
as the key. Key should be unique in a certain "signal". In detail, the callbacks of different signals are stored in a dict[str, CallbackFn]
structure:
cls.callbacks = {
"before": {
"callback_1": callable(),
...
},
"upon": {},
"after": {},
}
Callback manager will NOT try to catch any exceptions caused by callback functions.
The basic flow is as below:
- [Init] Initialize temp variables
- [Trigger] Trigger "before" callback signal
- Validate send args
- Generate ORM class instance
- [Trigger] Trigger "upon" callback signal
- Commit to database
- [Trigger] Trigger "after" signal
- Clear temp variables
- Return
-
before
Sender and receiver determined -
upon
ORM class instance generated, but not add to database -
after
ORM class added to database, committed
Callbacks function comply with following type:
CallbackFnType = Callable[[NotificationSender], Awaitable[Any] | Any]
This means both sync and async function are supported.
For callable and async detection,is_callable()
and inspect.is_awaitable()
is used.
The recommend workflow is changing curr_*
members of received NotificationSender
instance. For example, if we want to stop message sent to user with id 1
, then we could write a callback function like below:
def do_not_send_to_user_1(sender: NotificationSender):
if sender.curr_receiver.user_id == 1:
return False
return None
Another example, redirect message from user 1
to 2
def do_not_send_to_user_1(sender: NotificationSender):
if sender.curr_receiver.user_id == 1:
sender.curr_receiver = get_user_by_id_sync(2)
return None
For time-consuming callbacks, it's not recommended to wait until the task finished before return. Instead, consider putting task in to current asyncio
event loop then return immediately.
async def callback_func(sender: NotificationSender):
loop = asyncio.get_running_loop()
loop.create_task(_actual_time_consuming_callback(sender))
return True
One thing to notice is that since the actual task running in a loop, the attributes of sender
instance may changes during the callback execution, or some of the ORM instance may detached from the original session that product it. Or even worse, the sender()
is called again and the temp value is completely different!
This possible changes should be taken into consideration when designing the callback using the above pattern. If you want to use the value at the point that the callback has been triggered, one of the approach is to extract those value you need in the outer callback function before passing as parameter instead of directly passing sender
instance:
async def callback_fn(sender: NotificationSender):
try:
loop = get_running_loop()
loop.create_task(
# here means all param received by the actual task function
# will be as the same value as they were when this callback
# function has been triggered.
_actual_time_consuming_task(
message_sender=sender.curr_sender,
message_receiver=sender.receiver,
message_content=sender.curr_content,
orm_notification=sender.curr_orm_notification,
)
)
except Exception as e:
... # error handling
logger.debug("Telegram middleware returned")
return True
Also, if you need to exploit lazy-load feature of SQLAlchemy ORM, you need to refetch ORM instances since they may already been detached from original session:
async def _actual_time_consuming_task(receiver: orm.User):
async with session_manager() as ss:
receiver = await ss.get(receiver.user_id)
# using receiver here with lazy-load feature.
If you don't want the possible error raised by callbacks to break the notification sending process, use a try
except
block to catch error inside callbacks.
Also, you should manually handle the error message raised in _actual_time_consuming_callback
function.
If the return value satisfy <ret_value> == False
, the notification sending process will be interrput, however the callback function with the same "signal" will be executed.
By storing content_type
using a separate column and using discriminator, we could get:
- Better SQL-level filtering & querying
- Better OpenAPI doc support
Be careful about Horizontal Privilege Escalation. User should only have access to the notifications related to himself.
Currently the notification system does NOT support Boardcast Message. Which means each message have a One-To-One structure.