web_conversation_service.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. from typing import Optional, Union
  2. from core.app.entities.app_invoke_entities import InvokeFrom
  3. from extensions.ext_database import db
  4. from libs.infinite_scroll_pagination import InfiniteScrollPagination
  5. from models.account import Account
  6. from models.model import App, EndUser
  7. from models.web import PinnedConversation
  8. from services.conversation_service import ConversationService
  9. class WebConversationService:
  10. @classmethod
  11. def pagination_by_last_id(
  12. cls,
  13. app_model: App,
  14. user: Optional[Union[Account, EndUser]],
  15. last_id: Optional[str],
  16. limit: int,
  17. invoke_from: InvokeFrom,
  18. pinned: Optional[bool] = None,
  19. sort_by="-updated_at",
  20. ) -> InfiniteScrollPagination:
  21. include_ids = None
  22. exclude_ids = None
  23. if pinned is not None:
  24. pinned_conversations = (
  25. db.session.query(PinnedConversation)
  26. .filter(
  27. PinnedConversation.app_id == app_model.id,
  28. PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
  29. PinnedConversation.created_by == user.id,
  30. )
  31. .order_by(PinnedConversation.created_at.desc())
  32. .all()
  33. )
  34. pinned_conversation_ids = [pc.conversation_id for pc in pinned_conversations]
  35. if pinned:
  36. include_ids = pinned_conversation_ids
  37. else:
  38. exclude_ids = pinned_conversation_ids
  39. return ConversationService.pagination_by_last_id(
  40. app_model=app_model,
  41. user=user,
  42. last_id=last_id,
  43. limit=limit,
  44. invoke_from=invoke_from,
  45. include_ids=include_ids,
  46. exclude_ids=exclude_ids,
  47. sort_by=sort_by,
  48. )
  49. @classmethod
  50. def pin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
  51. pinned_conversation = (
  52. db.session.query(PinnedConversation)
  53. .filter(
  54. PinnedConversation.app_id == app_model.id,
  55. PinnedConversation.conversation_id == conversation_id,
  56. PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
  57. PinnedConversation.created_by == user.id,
  58. )
  59. .first()
  60. )
  61. if pinned_conversation:
  62. return
  63. conversation = ConversationService.get_conversation(
  64. app_model=app_model, conversation_id=conversation_id, user=user
  65. )
  66. pinned_conversation = PinnedConversation(
  67. app_id=app_model.id,
  68. conversation_id=conversation.id,
  69. created_by_role="account" if isinstance(user, Account) else "end_user",
  70. created_by=user.id,
  71. )
  72. db.session.add(pinned_conversation)
  73. db.session.commit()
  74. @classmethod
  75. def unpin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
  76. pinned_conversation = (
  77. db.session.query(PinnedConversation)
  78. .filter(
  79. PinnedConversation.app_id == app_model.id,
  80. PinnedConversation.conversation_id == conversation_id,
  81. PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
  82. PinnedConversation.created_by == user.id,
  83. )
  84. .first()
  85. )
  86. if not pinned_conversation:
  87. return
  88. db.session.delete(pinned_conversation)
  89. db.session.commit()