data_source.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. import datetime
  2. import json
  3. from flask import request
  4. from flask_login import current_user
  5. from flask_restful import Resource, marshal_with, reqparse
  6. from werkzeug.exceptions import NotFound
  7. from controllers.console import api
  8. from controllers.console.setup import setup_required
  9. from controllers.console.wraps import account_initialization_required
  10. from core.indexing_runner import IndexingRunner
  11. from core.rag.extractor.entity.extract_setting import ExtractSetting
  12. from core.rag.extractor.notion_extractor import NotionExtractor
  13. from extensions.ext_database import db
  14. from fields.data_source_fields import integrate_list_fields, integrate_notion_info_list_fields
  15. from libs.login import login_required
  16. from models.dataset import Document
  17. from models.source import DataSourceBinding
  18. from services.dataset_service import DatasetService, DocumentService
  19. from tasks.document_indexing_sync_task import document_indexing_sync_task
  20. class DataSourceApi(Resource):
  21. @setup_required
  22. @login_required
  23. @account_initialization_required
  24. @marshal_with(integrate_list_fields)
  25. def get(self):
  26. # get workspace data source integrates
  27. data_source_integrates = db.session.query(DataSourceBinding).filter(
  28. DataSourceBinding.tenant_id == current_user.current_tenant_id,
  29. DataSourceBinding.disabled == False
  30. ).all()
  31. base_url = request.url_root.rstrip('/')
  32. data_source_oauth_base_path = "/console/api/oauth/data-source"
  33. providers = ["notion"]
  34. integrate_data = []
  35. for provider in providers:
  36. # existing_integrate = next((ai for ai in data_source_integrates if ai.provider == provider), None)
  37. existing_integrates = filter(lambda item: item.provider == provider, data_source_integrates)
  38. if existing_integrates:
  39. for existing_integrate in list(existing_integrates):
  40. integrate_data.append({
  41. 'id': existing_integrate.id,
  42. 'provider': provider,
  43. 'created_at': existing_integrate.created_at,
  44. 'is_bound': True,
  45. 'disabled': existing_integrate.disabled,
  46. 'source_info': existing_integrate.source_info,
  47. 'link': f'{base_url}{data_source_oauth_base_path}/{provider}'
  48. })
  49. else:
  50. integrate_data.append({
  51. 'id': None,
  52. 'provider': provider,
  53. 'created_at': None,
  54. 'source_info': None,
  55. 'is_bound': False,
  56. 'disabled': None,
  57. 'link': f'{base_url}{data_source_oauth_base_path}/{provider}'
  58. })
  59. return {'data': integrate_data}, 200
  60. @setup_required
  61. @login_required
  62. @account_initialization_required
  63. def patch(self, binding_id, action):
  64. binding_id = str(binding_id)
  65. action = str(action)
  66. data_source_binding = DataSourceBinding.query.filter_by(
  67. id=binding_id
  68. ).first()
  69. if data_source_binding is None:
  70. raise NotFound('Data source binding not found.')
  71. # enable binding
  72. if action == 'enable':
  73. if data_source_binding.disabled:
  74. data_source_binding.disabled = False
  75. data_source_binding.updated_at = datetime.datetime.utcnow()
  76. db.session.add(data_source_binding)
  77. db.session.commit()
  78. else:
  79. raise ValueError('Data source is not disabled.')
  80. # disable binding
  81. if action == 'disable':
  82. if not data_source_binding.disabled:
  83. data_source_binding.disabled = True
  84. data_source_binding.updated_at = datetime.datetime.utcnow()
  85. db.session.add(data_source_binding)
  86. db.session.commit()
  87. else:
  88. raise ValueError('Data source is disabled.')
  89. return {'result': 'success'}, 200
  90. class DataSourceNotionListApi(Resource):
  91. @setup_required
  92. @login_required
  93. @account_initialization_required
  94. @marshal_with(integrate_notion_info_list_fields)
  95. def get(self):
  96. dataset_id = request.args.get('dataset_id', default=None, type=str)
  97. exist_page_ids = []
  98. # import notion in the exist dataset
  99. if dataset_id:
  100. dataset = DatasetService.get_dataset(dataset_id)
  101. if not dataset:
  102. raise NotFound('Dataset not found.')
  103. if dataset.data_source_type != 'notion_import':
  104. raise ValueError('Dataset is not notion type.')
  105. documents = Document.query.filter_by(
  106. dataset_id=dataset_id,
  107. tenant_id=current_user.current_tenant_id,
  108. data_source_type='notion_import',
  109. enabled=True
  110. ).all()
  111. if documents:
  112. for document in documents:
  113. data_source_info = json.loads(document.data_source_info)
  114. exist_page_ids.append(data_source_info['notion_page_id'])
  115. # get all authorized pages
  116. data_source_bindings = DataSourceBinding.query.filter_by(
  117. tenant_id=current_user.current_tenant_id,
  118. provider='notion',
  119. disabled=False
  120. ).all()
  121. if not data_source_bindings:
  122. return {
  123. 'notion_info': []
  124. }, 200
  125. pre_import_info_list = []
  126. for data_source_binding in data_source_bindings:
  127. source_info = data_source_binding.source_info
  128. pages = source_info['pages']
  129. # Filter out already bound pages
  130. for page in pages:
  131. if page['page_id'] in exist_page_ids:
  132. page['is_bound'] = True
  133. else:
  134. page['is_bound'] = False
  135. pre_import_info = {
  136. 'workspace_name': source_info['workspace_name'],
  137. 'workspace_icon': source_info['workspace_icon'],
  138. 'workspace_id': source_info['workspace_id'],
  139. 'pages': pages,
  140. }
  141. pre_import_info_list.append(pre_import_info)
  142. return {
  143. 'notion_info': pre_import_info_list
  144. }, 200
  145. class DataSourceNotionApi(Resource):
  146. @setup_required
  147. @login_required
  148. @account_initialization_required
  149. def get(self, workspace_id, page_id, page_type):
  150. workspace_id = str(workspace_id)
  151. page_id = str(page_id)
  152. data_source_binding = DataSourceBinding.query.filter(
  153. db.and_(
  154. DataSourceBinding.tenant_id == current_user.current_tenant_id,
  155. DataSourceBinding.provider == 'notion',
  156. DataSourceBinding.disabled == False,
  157. DataSourceBinding.source_info['workspace_id'] == f'"{workspace_id}"'
  158. )
  159. ).first()
  160. if not data_source_binding:
  161. raise NotFound('Data source binding not found.')
  162. extractor = NotionExtractor(
  163. notion_workspace_id=workspace_id,
  164. notion_obj_id=page_id,
  165. notion_page_type=page_type,
  166. notion_access_token=data_source_binding.access_token,
  167. tenant_id=current_user.current_tenant_id
  168. )
  169. text_docs = extractor.extract()
  170. return {
  171. 'content': "\n".join([doc.page_content for doc in text_docs])
  172. }, 200
  173. @setup_required
  174. @login_required
  175. @account_initialization_required
  176. def post(self):
  177. parser = reqparse.RequestParser()
  178. parser.add_argument('notion_info_list', type=list, required=True, nullable=True, location='json')
  179. parser.add_argument('process_rule', type=dict, required=True, nullable=True, location='json')
  180. parser.add_argument('doc_form', type=str, default='text_model', required=False, nullable=False, location='json')
  181. parser.add_argument('doc_language', type=str, default='English', required=False, nullable=False, location='json')
  182. args = parser.parse_args()
  183. # validate args
  184. DocumentService.estimate_args_validate(args)
  185. notion_info_list = args['notion_info_list']
  186. extract_settings = []
  187. for notion_info in notion_info_list:
  188. workspace_id = notion_info['workspace_id']
  189. for page in notion_info['pages']:
  190. extract_setting = ExtractSetting(
  191. datasource_type="notion_import",
  192. notion_info={
  193. "notion_workspace_id": workspace_id,
  194. "notion_obj_id": page['page_id'],
  195. "notion_page_type": page['type'],
  196. "tenant_id": current_user.current_tenant_id
  197. },
  198. document_model=args['doc_form']
  199. )
  200. extract_settings.append(extract_setting)
  201. indexing_runner = IndexingRunner()
  202. response = indexing_runner.indexing_estimate(current_user.current_tenant_id, extract_settings,
  203. args['process_rule'], args['doc_form'],
  204. args['doc_language'])
  205. return response, 200
  206. class DataSourceNotionDatasetSyncApi(Resource):
  207. @setup_required
  208. @login_required
  209. @account_initialization_required
  210. def get(self, dataset_id):
  211. dataset_id_str = str(dataset_id)
  212. dataset = DatasetService.get_dataset(dataset_id_str)
  213. if dataset is None:
  214. raise NotFound("Dataset not found.")
  215. documents = DocumentService.get_document_by_dataset_id(dataset_id_str)
  216. for document in documents:
  217. document_indexing_sync_task.delay(dataset_id_str, document.id)
  218. return 200
  219. class DataSourceNotionDocumentSyncApi(Resource):
  220. @setup_required
  221. @login_required
  222. @account_initialization_required
  223. def get(self, dataset_id, document_id):
  224. dataset_id_str = str(dataset_id)
  225. document_id_str = str(document_id)
  226. dataset = DatasetService.get_dataset(dataset_id_str)
  227. if dataset is None:
  228. raise NotFound("Dataset not found.")
  229. document = DocumentService.get_document(dataset_id_str, document_id_str)
  230. if document is None:
  231. raise NotFound("Document not found.")
  232. document_indexing_sync_task.delay(dataset_id_str, document_id_str)
  233. return 200
  234. api.add_resource(DataSourceApi, '/data-source/integrates', '/data-source/integrates/<uuid:binding_id>/<string:action>')
  235. api.add_resource(DataSourceNotionListApi, '/notion/pre-import/pages')
  236. api.add_resource(DataSourceNotionApi,
  237. '/notion/workspaces/<uuid:workspace_id>/pages/<uuid:page_id>/<string:page_type>/preview',
  238. '/datasets/notion-indexing-estimate')
  239. api.add_resource(DataSourceNotionDatasetSyncApi, '/datasets/<uuid:dataset_id>/notion/sync')
  240. api.add_resource(DataSourceNotionDocumentSyncApi, '/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/notion/sync')