123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 |
- # Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import os.path as osp
- import tempfile
- import unittest.mock as mock
- import cv2
- import paddle
- import paddlers as pdrs
- from testing_utils import CommonTest, run_script
- class TestPredictor(CommonTest):
- MODULE = pdrs.tasks
- TRAINER_NAME_TO_EXPORT_OPTS = {}
- @staticmethod
- def add_tests(cls):
- """
- Automatically patch testing functions to cls.
- """
- def _test_predictor(trainer_name):
- def _test_predictor_impl(self):
- trainer_class = getattr(self.MODULE, trainer_name)
- # Construct trainer with default parameters
- trainer = trainer_class()
- with tempfile.TemporaryDirectory() as td:
- dynamic_model_dir = osp.join(td, "dynamic")
- static_model_dir = osp.join(td, "static")
- # HACK: BaseModel.save_model() requires BaseModel().optimizer to be set
- optimizer = mock.Mock()
- optimizer.state_dict.return_value = {'foo': 'bar'}
- trainer.optimizer = optimizer
- trainer.save_model(dynamic_model_dir)
- export_cmd = f"python export_model.py --model_dir {dynamic_model_dir} --save_dir {static_model_dir} "
- if trainer_name in self.TRAINER_NAME_TO_EXPORT_OPTS:
- export_cmd += self.TRAINER_NAME_TO_EXPORT_OPTS[
- trainer_name]
- elif '_default' in self.TRAINER_NAME_TO_EXPORT_OPTS:
- export_cmd += self.TRAINER_NAME_TO_EXPORT_OPTS[
- '_default']
- run_script(export_cmd, wd="../deploy/export")
- # Construct predictor
- # TODO: Test trt and mkl
- predictor = pdrs.deploy.Predictor(
- static_model_dir,
- use_gpu=paddle.device.get_device().startswith('gpu'))
- self.check_predictor(predictor, trainer)
- return _test_predictor_impl
- for trainer_name in cls.MODULE.__all__:
- setattr(cls, 'test_' + trainer_name, _test_predictor(trainer_name))
- return cls
- def check_predictor(self, predictor, trainer):
- raise NotImplementedError
- def check_dict_equal(self, dict_, expected_dict):
- if isinstance(dict_, list):
- self.assertIsInstance(expected_dict, list)
- self.assertEqual(len(dict_), len(expected_dict))
- for d1, d2 in zip(dict_, expected_dict):
- self.check_dict_equal(d1, d2)
- else:
- assert isinstance(dict_, dict)
- assert isinstance(expected_dict, dict)
- self.assertEqual(dict_.keys(), expected_dict.keys())
- for key in dict_.keys():
- self.check_output_equal(dict_[key], expected_dict[key])
- @TestPredictor.add_tests
- class TestCDPredictor(TestPredictor):
- MODULE = pdrs.tasks.change_detector
- TRAINER_NAME_TO_EXPORT_OPTS = {
- 'BIT': "--fixed_input_shape [1,3,256,256]",
- '_default': "--fixed_input_shape [-1,3,256,256]"
- }
- def check_predictor(self, predictor, trainer):
- t1_path = "data/ssmt/optical_t1.bmp"
- t2_path = "data/ssmt/optical_t2.bmp"
- single_input = (t1_path, t2_path)
- num_inputs = 2
- transforms = pdrs.transforms.Compose([pdrs.transforms.Normalize()])
- # Expected failure
- with self.assertRaises(ValueError):
- predictor.predict(t1_path, transforms=transforms)
- # Single input (file paths)
- input_ = single_input
- out_single_file_p = predictor.predict(input_, transforms=transforms)
- out_single_file_t = trainer.predict(input_, transforms=transforms)
- self.check_dict_equal(out_single_file_p, out_single_file_t)
- out_single_file_list_p = predictor.predict(
- [input_], transforms=transforms)
- self.assertEqual(len(out_single_file_list_p), 1)
- self.check_dict_equal(out_single_file_list_p[0], out_single_file_p)
- out_single_file_list_t = trainer.predict(
- [input_], transforms=transforms)
- self.check_dict_equal(out_single_file_list_p[0],
- out_single_file_list_t[0])
- # Single input (ndarrays)
- input_ = (
- cv2.imread(t1_path).astype('float32'),
- cv2.imread(t2_path).astype('float32')) # Reuse the name `input_`
- out_single_array_p = predictor.predict(input_, transforms=transforms)
- self.check_dict_equal(out_single_array_p, out_single_file_p)
- out_single_array_t = trainer.predict(input_, transforms=transforms)
- self.check_dict_equal(out_single_array_p, out_single_array_t)
- out_single_array_list_p = predictor.predict(
- [input_], transforms=transforms)
- self.assertEqual(len(out_single_array_list_p), 1)
- self.check_dict_equal(out_single_array_list_p[0], out_single_array_p)
- out_single_array_list_t = trainer.predict(
- [input_], transforms=transforms)
- self.check_dict_equal(out_single_array_list_p[0],
- out_single_array_list_t[0])
- if isinstance(trainer, pdrs.tasks.change_detector.BIT):
- return
- # Multiple inputs (file paths)
- input_ = [single_input] * num_inputs # Reuse the name `input_`
- out_multi_file_p = predictor.predict(input_, transforms=transforms)
- self.assertEqual(len(out_multi_file_p), num_inputs)
- out_multi_file_t = trainer.predict(input_, transforms=transforms)
- self.check_dict_equal(out_multi_file_p, out_multi_file_t)
- # Multiple inputs (ndarrays)
- input_ = [(cv2.imread(t1_path).astype('float32'), cv2.imread(t2_path)
- .astype('float32'))] * num_inputs # Reuse the name `input_`
- out_multi_array_p = predictor.predict(input_, transforms=transforms)
- self.assertEqual(len(out_multi_array_p), num_inputs)
- out_multi_array_t = trainer.predict(input_, transforms=transforms)
- self.check_dict_equal(out_multi_array_p, out_multi_array_t)
- @TestPredictor.add_tests
- class TestClasPredictor(TestPredictor):
- MODULE = pdrs.tasks.classifier
- TRAINER_NAME_TO_EXPORT_OPTS = {
- '_default': "--fixed_input_shape [-1,3,256,256]"
- }
- def check_predictor(self, predictor, trainer):
- single_input = "data/ssmt/optical_t1.bmp"
- num_inputs = 2
- transforms = pdrs.transforms.Compose([pdrs.transforms.Normalize()])
- labels = list(range(2))
- trainer.labels = labels
- predictor._model.labels = labels
- # Single input (file path)
- input_ = single_input
- out_single_file_p = predictor.predict(input_, transforms=transforms)
- out_single_file_t = trainer.predict(input_, transforms=transforms)
- self.check_dict_equal(out_single_file_p, out_single_file_t)
- out_single_file_list_p = predictor.predict(
- [input_], transforms=transforms)
- self.assertEqual(len(out_single_file_list_p), 1)
- self.check_dict_equal(out_single_file_list_p[0], out_single_file_p)
- out_single_file_list_t = trainer.predict(
- [input_], transforms=transforms)
- self.check_dict_equal(out_single_file_list_p[0],
- out_single_file_list_t[0])
- # Single input (ndarray)
- input_ = cv2.imread(single_input).astype(
- 'float32') # Reuse the name `input_`
- out_single_array_p = predictor.predict(input_, transforms=transforms)
- self.check_dict_equal(out_single_array_p, out_single_file_p)
- out_single_array_t = trainer.predict(input_, transforms=transforms)
- self.check_dict_equal(out_single_array_p, out_single_array_t)
- out_single_array_list_p = predictor.predict(
- [input_], transforms=transforms)
- self.assertEqual(len(out_single_array_list_p), 1)
- self.check_dict_equal(out_single_array_list_p[0], out_single_array_p)
- out_single_array_list_t = trainer.predict(
- [input_], transforms=transforms)
- self.check_dict_equal(out_single_array_list_p[0],
- out_single_array_list_t[0])
- # Multiple inputs (file paths)
- input_ = [single_input] * num_inputs # Reuse the name `input_`
- out_multi_file_p = predictor.predict(input_, transforms=transforms)
- self.assertEqual(len(out_multi_file_p), num_inputs)
- out_multi_file_t = trainer.predict(input_, transforms=transforms)
- self.assertEqual(len(out_multi_file_p), len(out_multi_file_t))
- self.check_dict_equal(out_multi_file_p, out_multi_file_t)
- # Multiple inputs (ndarrays)
- input_ = [cv2.imread(single_input).astype('float32')
- ] * num_inputs # Reuse the name `input_`
- out_multi_array_p = predictor.predict(input_, transforms=transforms)
- self.assertEqual(len(out_multi_array_p), num_inputs)
- out_multi_array_t = trainer.predict(input_, transforms=transforms)
- self.assertEqual(len(out_multi_array_p), len(out_multi_array_t))
- self.check_dict_equal(out_multi_array_p, out_multi_array_t)
- @TestPredictor.add_tests
- class TestDetPredictor(TestPredictor):
- MODULE = pdrs.tasks.object_detector
- TRAINER_NAME_TO_EXPORT_OPTS = {
- '_default': "--fixed_input_shape [-1,3,256,256]"
- }
- def check_predictor(self, predictor, trainer):
- single_input = "data/ssmt/optical_t1.bmp"
- num_inputs = 2
- transforms = pdrs.transforms.Compose([pdrs.transforms.Normalize()])
- labels = list(range(80))
- trainer.labels = labels
- predictor._model.labels = labels
- # Single input (file path)
- input_ = single_input
- out_single_file_p = predictor.predict(input_, transforms=transforms)
- out_single_file_t = trainer.predict(input_, transforms=transforms)
- self.check_dict_equal(out_single_file_p, out_single_file_t)
- out_single_file_list_p = predictor.predict(
- [input_], transforms=transforms)
- self.assertEqual(len(out_single_file_list_p), 1)
- self.check_dict_equal(out_single_file_list_p[0], out_single_file_p)
- out_single_file_list_t = trainer.predict(
- [input_], transforms=transforms)
- self.check_dict_equal(out_single_file_list_p[0],
- out_single_file_list_t[0])
- # Single input (ndarray)
- input_ = cv2.imread(single_input).astype(
- 'float32') # Reuse the name `input_`
- out_single_array_p = predictor.predict(input_, transforms=transforms)
- self.check_dict_equal(out_single_array_p, out_single_file_p)
- out_single_array_t = trainer.predict(input_, transforms=transforms)
- self.check_dict_equal(out_single_array_p, out_single_array_t)
- out_single_array_list_p = predictor.predict(
- [input_], transforms=transforms)
- self.assertEqual(len(out_single_array_list_p), 1)
- self.check_dict_equal(out_single_array_list_p[0], out_single_array_p)
- out_single_array_list_t = trainer.predict(
- [input_], transforms=transforms)
- self.check_dict_equal(out_single_array_list_p[0],
- out_single_array_list_t[0])
- # Multiple inputs (file paths)
- input_ = [single_input] * num_inputs # Reuse the name `input_`
- out_multi_file_p = predictor.predict(input_, transforms=transforms)
- self.assertEqual(len(out_multi_file_p), num_inputs)
- out_multi_file_t = trainer.predict(input_, transforms=transforms)
- self.assertEqual(len(out_multi_file_p), len(out_multi_file_t))
- self.check_dict_equal(out_multi_file_p, out_multi_file_t)
- # Multiple inputs (ndarrays)
- input_ = [cv2.imread(single_input).astype('float32')
- ] * num_inputs # Reuse the name `input_`
- out_multi_array_p = predictor.predict(input_, transforms=transforms)
- self.assertEqual(len(out_multi_array_p), num_inputs)
- out_multi_array_t = trainer.predict(input_, transforms=transforms)
- self.assertEqual(len(out_multi_array_p), len(out_multi_array_t))
- self.check_dict_equal(out_multi_array_p, out_multi_array_t)
- @TestPredictor.add_tests
- class TestSegPredictor(TestPredictor):
- MODULE = pdrs.tasks.segmenter
- TRAINER_NAME_TO_EXPORT_OPTS = {
- '_default': "--fixed_input_shape [-1,3,256,256]"
- }
- def check_predictor(self, predictor, trainer):
- single_input = "data/ssmt/optical_t1.bmp"
- num_inputs = 2
- transforms = pdrs.transforms.Compose([pdrs.transforms.Normalize()])
- # Single input (file path)
- input_ = single_input
- out_single_file_p = predictor.predict(input_, transforms=transforms)
- out_single_file_t = trainer.predict(input_, transforms=transforms)
- self.check_dict_equal(out_single_file_p, out_single_file_t)
- out_single_file_list_p = predictor.predict(
- [input_], transforms=transforms)
- self.assertEqual(len(out_single_file_list_p), 1)
- self.check_dict_equal(out_single_file_list_p[0], out_single_file_p)
- out_single_file_list_t = trainer.predict(
- [input_], transforms=transforms)
- self.check_dict_equal(out_single_file_list_p[0],
- out_single_file_list_t[0])
- # Single input (ndarray)
- input_ = cv2.imread(single_input).astype(
- 'float32') # Reuse the name `input_`
- out_single_array_p = predictor.predict(input_, transforms=transforms)
- self.check_dict_equal(out_single_array_p, out_single_file_p)
- out_single_array_t = trainer.predict(input_, transforms=transforms)
- self.check_dict_equal(out_single_array_p, out_single_array_t)
- out_single_array_list_p = predictor.predict(
- [input_], transforms=transforms)
- self.assertEqual(len(out_single_array_list_p), 1)
- self.check_dict_equal(out_single_array_list_p[0], out_single_array_p)
- out_single_array_list_t = trainer.predict(
- [input_], transforms=transforms)
- self.check_dict_equal(out_single_array_list_p[0],
- out_single_array_list_t[0])
- # Multiple inputs (file paths)
- input_ = [single_input] * num_inputs # Reuse the name `input_`
- out_multi_file_p = predictor.predict(input_, transforms=transforms)
- self.assertEqual(len(out_multi_file_p), num_inputs)
- out_multi_file_t = trainer.predict(input_, transforms=transforms)
- self.assertEqual(len(out_multi_file_p), len(out_multi_file_t))
- self.check_dict_equal(out_multi_file_p, out_multi_file_t)
- # Multiple inputs (ndarrays)
- input_ = [cv2.imread(single_input).astype('float32')
- ] * num_inputs # Reuse the name `input_`
- out_multi_array_p = predictor.predict(input_, transforms=transforms)
- self.assertEqual(len(out_multi_array_p), num_inputs)
- out_multi_array_t = trainer.predict(input_, transforms=transforms)
- self.assertEqual(len(out_multi_array_p), len(out_multi_array_t))
- self.check_dict_equal(out_multi_array_p, out_multi_array_t)
|