123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- # Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import inspect
- import paddle
- import numpy as np
- from paddle.static import InputSpec
- from paddlers.utils import logging
- from testing_utils import CommonTest
- class _TestModelNamespace:
- class TestModel(CommonTest):
- MODEL_CLASS = None
- DEFAULT_HW = (256, 256)
- DEFAULT_BATCH_SIZE = 2
- def setUp(self):
- self.set_specs()
- self.set_inputs()
- self.set_targets()
- self.set_models()
- def test_forward(self):
- for i, (
- input, model, target
- ) in enumerate(zip(self.inputs, self.models, self.targets)):
- try:
- if isinstance(input, list):
- output = model(*input)
- else:
- output = model(input)
- self.check_output(output, target)
- except:
- logging.warning(f"Model built with spec{i} failed!")
- raise
- def test_to_static(self):
- for i, (
- input, model, target
- ) in enumerate(zip(self.inputs, self.models, self.targets)):
- try:
- static_model = paddle.jit.to_static(
- model, input_spec=self.get_input_spec(model, input))
- except:
- logging.warning(f"Model built with spec{i} failed!")
- raise
- def check_output(self, output, target):
- pass
- def set_specs(self):
- self.specs = []
- def set_models(self):
- self.models = (self.build_model(spec) for spec in self.specs)
- def set_inputs(self):
- self.inputs = []
- def set_targets(self):
- self.targets = []
- def build_model(self, spec):
- if '_phase' in spec:
- phase = spec.pop('_phase')
- else:
- phase = 'train'
- if '_stop_grad' in spec:
- stop_grad = spec.pop('_stop_grad')
- else:
- stop_grad = False
- model = self.MODEL_CLASS(**spec)
- if phase == 'train':
- model.train()
- elif phase == 'eval':
- model.eval()
- if stop_grad:
- for p in model.parameters():
- p.stop_gradient = True
- return model
- def get_shape(self, c, b=None, h=None, w=None):
- if h is None or w is None:
- h, w = self.DEFAULT_HW
- if b is None:
- b = self.DEFAULT_BATCH_SIZE
- return (b, c, h, w)
- def get_zeros_array(self, c, b=None, h=None, w=None):
- shape = self.get_shape(c, b, h, w)
- return np.zeros(shape)
- def get_randn_tensor(self, c, b=None, h=None, w=None):
- shape = self.get_shape(c, b, h, w)
- return paddle.randn(shape)
- def get_input_spec(self, model, input):
- if not isinstance(input, list):
- input = [input]
- input_spec = []
- for param_name, tensor in zip(
- inspect.signature(model.forward).parameters, input):
- # XXX: Hard-code dtype
- input_spec.append(
- InputSpec(
- shape=tensor.shape, name=param_name, dtype='float32'))
- return input_spec
- def allow_oom(cls):
- def _deco(func):
- def _wrapper(self, *args, **kwargs):
- try:
- func(self, *args, **kwargs)
- except (SystemError, RuntimeError, OSError, MemoryError) as e:
- # XXX: This may not cover all OOM cases.
- msg = str(e)
- if "Out of memory error" in msg \
- or "(External) CUDNN error(4), CUDNN_STATUS_INTERNAL_ERROR." in msg \
- or isinstance(e, MemoryError):
- logging.warning("An OOM error has been ignored.")
- else:
- raise
- return _wrapper
- for key, value in inspect.getmembers(cls):
- if key.startswith('test'):
- value = _deco(value)
- setattr(cls, key, value)
- return cls
- TestModel = _TestModelNamespace.TestModel
|