data_source.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. import datetime
  2. import json
  3. from cachetools import TTLCache
  4. from flask import request, current_app
  5. from flask_login import current_user
  6. from core.login.login import login_required
  7. from flask_restful import Resource, marshal_with, fields, reqparse, marshal
  8. from werkzeug.exceptions import NotFound
  9. from controllers.console import api
  10. from controllers.console.setup import setup_required
  11. from controllers.console.wraps import account_initialization_required
  12. from core.data_loader.loader.notion import NotionLoader
  13. from core.indexing_runner import IndexingRunner
  14. from extensions.ext_database import db
  15. from fields.data_source_fields import integrate_notion_info_list_fields, integrate_list_fields
  16. from libs.helper import TimestampField
  17. from models.dataset import Document
  18. from models.source import DataSourceBinding
  19. from services.dataset_service import DatasetService, DocumentService
  20. from tasks.document_indexing_sync_task import document_indexing_sync_task
  21. cache = TTLCache(maxsize=None, ttl=30)
  22. class DataSourceApi(Resource):
  23. @setup_required
  24. @login_required
  25. @account_initialization_required
  26. @marshal_with(integrate_list_fields)
  27. def get(self):
  28. # get workspace data source integrates
  29. data_source_integrates = db.session.query(DataSourceBinding).filter(
  30. DataSourceBinding.tenant_id == current_user.current_tenant_id,
  31. DataSourceBinding.disabled == False
  32. ).all()
  33. base_url = request.url_root.rstrip('/')
  34. data_source_oauth_base_path = "/console/api/oauth/data-source"
  35. providers = ["notion"]
  36. integrate_data = []
  37. for provider in providers:
  38. # existing_integrate = next((ai for ai in data_source_integrates if ai.provider == provider), None)
  39. existing_integrates = filter(lambda item: item.provider == provider, data_source_integrates)
  40. if existing_integrates:
  41. for existing_integrate in list(existing_integrates):
  42. integrate_data.append({
  43. 'id': existing_integrate.id,
  44. 'provider': provider,
  45. 'created_at': existing_integrate.created_at,
  46. 'is_bound': True,
  47. 'disabled': existing_integrate.disabled,
  48. 'source_info': existing_integrate.source_info,
  49. 'link': f'{base_url}{data_source_oauth_base_path}/{provider}'
  50. })
  51. else:
  52. integrate_data.append({
  53. 'id': None,
  54. 'provider': provider,
  55. 'created_at': None,
  56. 'source_info': None,
  57. 'is_bound': False,
  58. 'disabled': None,
  59. 'link': f'{base_url}{data_source_oauth_base_path}/{provider}'
  60. })
  61. return {'data': integrate_data}, 200
  62. @setup_required
  63. @login_required
  64. @account_initialization_required
  65. def patch(self, binding_id, action):
  66. binding_id = str(binding_id)
  67. action = str(action)
  68. data_source_binding = DataSourceBinding.query.filter_by(
  69. id=binding_id
  70. ).first()
  71. if data_source_binding is None:
  72. raise NotFound('Data source binding not found.')
  73. # enable binding
  74. if action == 'enable':
  75. if data_source_binding.disabled:
  76. data_source_binding.disabled = False
  77. data_source_binding.updated_at = datetime.datetime.utcnow()
  78. db.session.add(data_source_binding)
  79. db.session.commit()
  80. else:
  81. raise ValueError('Data source is not disabled.')
  82. # disable binding
  83. if action == 'disable':
  84. if not data_source_binding.disabled:
  85. data_source_binding.disabled = True
  86. data_source_binding.updated_at = datetime.datetime.utcnow()
  87. db.session.add(data_source_binding)
  88. db.session.commit()
  89. else:
  90. raise ValueError('Data source is disabled.')
  91. return {'result': 'success'}, 200
  92. class DataSourceNotionListApi(Resource):
  93. @setup_required
  94. @login_required
  95. @account_initialization_required
  96. @marshal_with(integrate_notion_info_list_fields)
  97. def get(self):
  98. dataset_id = request.args.get('dataset_id', default=None, type=str)
  99. exist_page_ids = []
  100. # import notion in the exist dataset
  101. if dataset_id:
  102. dataset = DatasetService.get_dataset(dataset_id)
  103. if not dataset:
  104. raise NotFound('Dataset not found.')
  105. if dataset.data_source_type != 'notion_import':
  106. raise ValueError('Dataset is not notion type.')
  107. documents = Document.query.filter_by(
  108. dataset_id=dataset_id,
  109. tenant_id=current_user.current_tenant_id,
  110. data_source_type='notion_import',
  111. enabled=True
  112. ).all()
  113. if documents:
  114. for document in documents:
  115. data_source_info = json.loads(document.data_source_info)
  116. exist_page_ids.append(data_source_info['notion_page_id'])
  117. # get all authorized pages
  118. data_source_bindings = DataSourceBinding.query.filter_by(
  119. tenant_id=current_user.current_tenant_id,
  120. provider='notion',
  121. disabled=False
  122. ).all()
  123. if not data_source_bindings:
  124. return {
  125. 'notion_info': []
  126. }, 200
  127. pre_import_info_list = []
  128. for data_source_binding in data_source_bindings:
  129. source_info = data_source_binding.source_info
  130. pages = source_info['pages']
  131. # Filter out already bound pages
  132. for page in pages:
  133. if page['page_id'] in exist_page_ids:
  134. page['is_bound'] = True
  135. else:
  136. page['is_bound'] = False
  137. pre_import_info = {
  138. 'workspace_name': source_info['workspace_name'],
  139. 'workspace_icon': source_info['workspace_icon'],
  140. 'workspace_id': source_info['workspace_id'],
  141. 'pages': pages,
  142. }
  143. pre_import_info_list.append(pre_import_info)
  144. return {
  145. 'notion_info': pre_import_info_list
  146. }, 200
  147. class DataSourceNotionApi(Resource):
  148. @setup_required
  149. @login_required
  150. @account_initialization_required
  151. def get(self, workspace_id, page_id, page_type):
  152. workspace_id = str(workspace_id)
  153. page_id = str(page_id)
  154. data_source_binding = DataSourceBinding.query.filter(
  155. db.and_(
  156. DataSourceBinding.tenant_id == current_user.current_tenant_id,
  157. DataSourceBinding.provider == 'notion',
  158. DataSourceBinding.disabled == False,
  159. DataSourceBinding.source_info['workspace_id'] == f'"{workspace_id}"'
  160. )
  161. ).first()
  162. if not data_source_binding:
  163. raise NotFound('Data source binding not found.')
  164. loader = NotionLoader(
  165. notion_access_token=data_source_binding.access_token,
  166. notion_workspace_id=workspace_id,
  167. notion_obj_id=page_id,
  168. notion_page_type=page_type
  169. )
  170. text_docs = loader.load()
  171. return {
  172. 'content': "\n".join([doc.page_content for doc in text_docs])
  173. }, 200
  174. @setup_required
  175. @login_required
  176. @account_initialization_required
  177. def post(self):
  178. parser = reqparse.RequestParser()
  179. parser.add_argument('notion_info_list', type=list, required=True, nullable=True, location='json')
  180. parser.add_argument('process_rule', type=dict, required=True, nullable=True, location='json')
  181. args = parser.parse_args()
  182. # validate args
  183. DocumentService.estimate_args_validate(args)
  184. indexing_runner = IndexingRunner()
  185. response = indexing_runner.notion_indexing_estimate(current_user.current_tenant_id, args['notion_info_list'], args['process_rule'])
  186. return response, 200
  187. class DataSourceNotionDatasetSyncApi(Resource):
  188. @setup_required
  189. @login_required
  190. @account_initialization_required
  191. def get(self, dataset_id):
  192. dataset_id_str = str(dataset_id)
  193. dataset = DatasetService.get_dataset(dataset_id_str)
  194. if dataset is None:
  195. raise NotFound("Dataset not found.")
  196. documents = DocumentService.get_document_by_dataset_id(dataset_id_str)
  197. for document in documents:
  198. document_indexing_sync_task.delay(dataset_id_str, document.id)
  199. return 200
  200. class DataSourceNotionDocumentSyncApi(Resource):
  201. @setup_required
  202. @login_required
  203. @account_initialization_required
  204. def get(self, dataset_id, document_id):
  205. dataset_id_str = str(dataset_id)
  206. document_id_str = str(document_id)
  207. dataset = DatasetService.get_dataset(dataset_id_str)
  208. if dataset is None:
  209. raise NotFound("Dataset not found.")
  210. document = DocumentService.get_document(dataset_id_str, document_id_str)
  211. if document is None:
  212. raise NotFound("Document not found.")
  213. document_indexing_sync_task.delay(dataset_id_str, document_id_str)
  214. return 200
  215. api.add_resource(DataSourceApi, '/data-source/integrates', '/data-source/integrates/<uuid:binding_id>/<string:action>')
  216. api.add_resource(DataSourceNotionListApi, '/notion/pre-import/pages')
  217. api.add_resource(DataSourceNotionApi,
  218. '/notion/workspaces/<uuid:workspace_id>/pages/<uuid:page_id>/<string:page_type>/preview',
  219. '/datasets/notion-indexing-estimate')
  220. api.add_resource(DataSourceNotionDatasetSyncApi, '/datasets/<uuid:dataset_id>/notion/sync')
  221. api.add_resource(DataSourceNotionDocumentSyncApi, '/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/notion/sync')