saved_message_service.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. from typing import Optional, Union
  2. from extensions.ext_database import db
  3. from libs.infinite_scroll_pagination import InfiniteScrollPagination
  4. from models.account import Account
  5. from models.model import App, EndUser
  6. from models.web import SavedMessage
  7. from services.message_service import MessageService
  8. class SavedMessageService:
  9. @classmethod
  10. def pagination_by_last_id(
  11. cls, app_model: App, user: Optional[Union[Account, EndUser]], last_id: Optional[str], limit: int
  12. ) -> InfiniteScrollPagination:
  13. saved_messages = (
  14. db.session.query(SavedMessage)
  15. .filter(
  16. SavedMessage.app_id == app_model.id,
  17. SavedMessage.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
  18. SavedMessage.created_by == user.id,
  19. )
  20. .order_by(SavedMessage.created_at.desc())
  21. .all()
  22. )
  23. message_ids = [sm.message_id for sm in saved_messages]
  24. return MessageService.pagination_by_last_id(
  25. app_model=app_model, user=user, last_id=last_id, limit=limit, include_ids=message_ids
  26. )
  27. @classmethod
  28. def save(cls, app_model: App, user: Optional[Union[Account, EndUser]], message_id: str):
  29. saved_message = (
  30. db.session.query(SavedMessage)
  31. .filter(
  32. SavedMessage.app_id == app_model.id,
  33. SavedMessage.message_id == message_id,
  34. SavedMessage.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
  35. SavedMessage.created_by == user.id,
  36. )
  37. .first()
  38. )
  39. if saved_message:
  40. return
  41. message = MessageService.get_message(app_model=app_model, user=user, message_id=message_id)
  42. saved_message = SavedMessage(
  43. app_id=app_model.id,
  44. message_id=message.id,
  45. created_by_role="account" if isinstance(user, Account) else "end_user",
  46. created_by=user.id,
  47. )
  48. db.session.add(saved_message)
  49. db.session.commit()
  50. @classmethod
  51. def delete(cls, app_model: App, user: Optional[Union[Account, EndUser]], message_id: str):
  52. saved_message = (
  53. db.session.query(SavedMessage)
  54. .filter(
  55. SavedMessage.app_id == app_model.id,
  56. SavedMessage.message_id == message_id,
  57. SavedMessage.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
  58. SavedMessage.created_by == user.id,
  59. )
  60. .first()
  61. )
  62. if not saved_message:
  63. return
  64. db.session.delete(saved_message)
  65. db.session.commit()