web_conversation_service.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. from typing import Optional, Union
  2. from core.app.entities.app_invoke_entities import InvokeFrom
  3. from extensions.ext_database import db
  4. from libs.infinite_scroll_pagination import InfiniteScrollPagination
  5. from models.account import Account
  6. from models.model import App, EndUser
  7. from models.web import PinnedConversation
  8. from services.conversation_service import ConversationService
  9. class WebConversationService:
  10. @classmethod
  11. def pagination_by_last_id(cls, app_model: App, user: Optional[Union[Account, EndUser]],
  12. last_id: Optional[str], limit: int, invoke_from: InvokeFrom,
  13. pinned: Optional[bool] = None) -> InfiniteScrollPagination:
  14. include_ids = None
  15. exclude_ids = None
  16. if pinned is not None:
  17. pinned_conversations = db.session.query(PinnedConversation).filter(
  18. PinnedConversation.app_id == app_model.id,
  19. PinnedConversation.created_by_role == ('account' if isinstance(user, Account) else 'end_user'),
  20. PinnedConversation.created_by == user.id
  21. ).order_by(PinnedConversation.created_at.desc()).all()
  22. pinned_conversation_ids = [pc.conversation_id for pc in pinned_conversations]
  23. if pinned:
  24. include_ids = pinned_conversation_ids
  25. else:
  26. exclude_ids = pinned_conversation_ids
  27. return ConversationService.pagination_by_last_id(
  28. app_model=app_model,
  29. user=user,
  30. last_id=last_id,
  31. limit=limit,
  32. invoke_from=invoke_from,
  33. include_ids=include_ids,
  34. exclude_ids=exclude_ids,
  35. )
  36. @classmethod
  37. def pin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
  38. pinned_conversation = db.session.query(PinnedConversation).filter(
  39. PinnedConversation.app_id == app_model.id,
  40. PinnedConversation.conversation_id == conversation_id,
  41. PinnedConversation.created_by_role == ('account' if isinstance(user, Account) else 'end_user'),
  42. PinnedConversation.created_by == user.id
  43. ).first()
  44. if pinned_conversation:
  45. return
  46. conversation = ConversationService.get_conversation(
  47. app_model=app_model,
  48. conversation_id=conversation_id,
  49. user=user
  50. )
  51. pinned_conversation = PinnedConversation(
  52. app_id=app_model.id,
  53. conversation_id=conversation.id,
  54. created_by_role='account' if isinstance(user, Account) else 'end_user',
  55. created_by=user.id
  56. )
  57. db.session.add(pinned_conversation)
  58. db.session.commit()
  59. @classmethod
  60. def unpin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
  61. pinned_conversation = db.session.query(PinnedConversation).filter(
  62. PinnedConversation.app_id == app_model.id,
  63. PinnedConversation.conversation_id == conversation_id,
  64. PinnedConversation.created_by_role == ('account' if isinstance(user, Account) else 'end_user'),
  65. PinnedConversation.created_by == user.id
  66. ).first()
  67. if not pinned_conversation:
  68. return
  69. db.session.delete(pinned_conversation)
  70. db.session.commit()