keywords.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. from core.moderation.base import Moderation, ModerationAction, ModerationInputsResult, ModerationOutputsResult
  2. class KeywordsModeration(Moderation):
  3. name: str = "keywords"
  4. @classmethod
  5. def validate_config(cls, tenant_id: str, config: dict) -> None:
  6. """
  7. Validate the incoming form config data.
  8. :param tenant_id: the id of workspace
  9. :param config: the form config data
  10. :return:
  11. """
  12. cls._validate_inputs_and_outputs_config(config, True)
  13. if not config.get("keywords"):
  14. raise ValueError("keywords is required")
  15. if len(config.get("keywords")) > 1000:
  16. raise ValueError("keywords length must be less than 1000")
  17. def moderation_for_inputs(self, inputs: dict, query: str = "") -> ModerationInputsResult:
  18. flagged = False
  19. preset_response = ""
  20. if self.config['inputs_config']['enabled']:
  21. preset_response = self.config['inputs_config']['preset_response']
  22. if query:
  23. inputs['query__'] = query
  24. # Filter out empty values
  25. keywords_list = [keyword for keyword in self.config['keywords'].split('\n') if keyword]
  26. flagged = self._is_violated(inputs, keywords_list)
  27. return ModerationInputsResult(flagged=flagged, action=ModerationAction.DIRECT_OUTPUT, preset_response=preset_response)
  28. def moderation_for_outputs(self, text: str) -> ModerationOutputsResult:
  29. flagged = False
  30. preset_response = ""
  31. if self.config['outputs_config']['enabled']:
  32. # Filter out empty values
  33. keywords_list = [keyword for keyword in self.config['keywords'].split('\n') if keyword]
  34. flagged = self._is_violated({'text': text}, keywords_list)
  35. preset_response = self.config['outputs_config']['preset_response']
  36. return ModerationOutputsResult(flagged=flagged, action=ModerationAction.DIRECT_OUTPUT, preset_response=preset_response)
  37. def _is_violated(self, inputs: dict, keywords_list: list) -> bool:
  38. for value in inputs.values():
  39. if self._check_keywords_in_value(keywords_list, value):
  40. return True
  41. return False
  42. def _check_keywords_in_value(self, keywords_list, value):
  43. for keyword in keywords_list:
  44. if keyword.lower() in value.lower():
  45. return True
  46. return False