1 # Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 |
|
2 # For details: https://bitbucket.org/ned/coveragepy/src/default/NOTICE.txt |
|
3 |
|
4 """Miscellaneous stuff for coverage.py.""" |
|
5 |
|
6 import errno |
|
7 import hashlib |
|
8 import inspect |
|
9 import locale |
|
10 import os |
|
11 import sys |
|
12 import types |
|
13 |
|
14 from coverage import env |
|
15 from coverage.backward import string_class, to_bytes, unicode_class |
|
16 |
|
17 ISOLATED_MODULES = {} |
|
18 |
|
19 |
|
20 def isolate_module(mod): |
|
21 """Copy a module so that we are isolated from aggressive mocking. |
|
22 |
|
23 If a test suite mocks os.path.exists (for example), and then we need to use |
|
24 it during the test, everything will get tangled up if we use their mock. |
|
25 Making a copy of the module when we import it will isolate coverage.py from |
|
26 those complications. |
|
27 """ |
|
28 if mod not in ISOLATED_MODULES: |
|
29 new_mod = types.ModuleType(mod.__name__) |
|
30 ISOLATED_MODULES[mod] = new_mod |
|
31 for name in dir(mod): |
|
32 value = getattr(mod, name) |
|
33 if isinstance(value, types.ModuleType): |
|
34 value = isolate_module(value) |
|
35 setattr(new_mod, name, value) |
|
36 return ISOLATED_MODULES[mod] |
|
37 |
|
38 os = isolate_module(os) |
|
39 |
|
40 |
|
41 # Use PyContracts for assertion testing on parameters and returns, but only if |
|
42 # we are running our own test suite. |
|
43 if env.TESTING: |
|
44 from contracts import contract # pylint: disable=unused-import |
|
45 from contracts import new_contract as raw_new_contract |
|
46 |
|
47 def new_contract(*args, **kwargs): |
|
48 """A proxy for contracts.new_contract that doesn't mind happening twice.""" |
|
49 try: |
|
50 return raw_new_contract(*args, **kwargs) |
|
51 except ValueError: |
|
52 # During meta-coverage, this module is imported twice, and |
|
53 # PyContracts doesn't like redefining contracts. It's OK. |
|
54 pass |
|
55 |
|
56 # Define contract words that PyContract doesn't have. |
|
57 new_contract('bytes', lambda v: isinstance(v, bytes)) |
|
58 if env.PY3: |
|
59 new_contract('unicode', lambda v: isinstance(v, unicode_class)) |
|
60 else: # pragma: not covered |
|
61 # We aren't using real PyContracts, so just define a no-op decorator as a |
|
62 # stunt double. |
|
63 def contract(**unused): |
|
64 """Dummy no-op implementation of `contract`.""" |
|
65 return lambda func: func |
|
66 |
|
67 def new_contract(*args_unused, **kwargs_unused): |
|
68 """Dummy no-op implementation of `new_contract`.""" |
|
69 pass |
|
70 |
|
71 |
|
72 def nice_pair(pair): |
|
73 """Make a nice string representation of a pair of numbers. |
|
74 |
|
75 If the numbers are equal, just return the number, otherwise return the pair |
|
76 with a dash between them, indicating the range. |
|
77 |
|
78 """ |
|
79 start, end = pair |
|
80 if start == end: |
|
81 return "%d" % start |
|
82 else: |
|
83 return "%d-%d" % (start, end) |
|
84 |
|
85 |
|
86 def format_lines(statements, lines): |
|
87 """Nicely format a list of line numbers. |
|
88 |
|
89 Format a list of line numbers for printing by coalescing groups of lines as |
|
90 long as the lines represent consecutive statements. This will coalesce |
|
91 even if there are gaps between statements. |
|
92 |
|
93 For example, if `statements` is [1,2,3,4,5,10,11,12,13,14] and |
|
94 `lines` is [1,2,5,10,11,13,14] then the result will be "1-2, 5-11, 13-14". |
|
95 |
|
96 """ |
|
97 pairs = [] |
|
98 i = 0 |
|
99 j = 0 |
|
100 start = None |
|
101 statements = sorted(statements) |
|
102 lines = sorted(lines) |
|
103 while i < len(statements) and j < len(lines): |
|
104 if statements[i] == lines[j]: |
|
105 if start is None: |
|
106 start = lines[j] |
|
107 end = lines[j] |
|
108 j += 1 |
|
109 elif start: |
|
110 pairs.append((start, end)) |
|
111 start = None |
|
112 i += 1 |
|
113 if start: |
|
114 pairs.append((start, end)) |
|
115 ret = ', '.join(map(nice_pair, pairs)) |
|
116 return ret |
|
117 |
|
118 |
|
119 def expensive(fn): |
|
120 """A decorator to indicate that a method shouldn't be called more than once. |
|
121 |
|
122 Normally, this does nothing. During testing, this raises an exception if |
|
123 called more than once. |
|
124 |
|
125 """ |
|
126 if env.TESTING: |
|
127 attr = "_once_" + fn.__name__ |
|
128 |
|
129 def _wrapped(self): |
|
130 """Inner function that checks the cache.""" |
|
131 if hasattr(self, attr): |
|
132 raise Exception("Shouldn't have called %s more than once" % fn.__name__) |
|
133 setattr(self, attr, True) |
|
134 return fn(self) |
|
135 return _wrapped |
|
136 else: |
|
137 return fn |
|
138 |
|
139 |
|
140 def bool_or_none(b): |
|
141 """Return bool(b), but preserve None.""" |
|
142 if b is None: |
|
143 return None |
|
144 else: |
|
145 return bool(b) |
|
146 |
|
147 |
|
148 def join_regex(regexes): |
|
149 """Combine a list of regexes into one that matches any of them.""" |
|
150 return "|".join("(?:%s)" % r for r in regexes) |
|
151 |
|
152 |
|
153 def file_be_gone(path): |
|
154 """Remove a file, and don't get annoyed if it doesn't exist.""" |
|
155 try: |
|
156 os.remove(path) |
|
157 except OSError as e: |
|
158 if e.errno != errno.ENOENT: |
|
159 raise |
|
160 |
|
161 |
|
162 def output_encoding(outfile=None): |
|
163 """Determine the encoding to use for output written to `outfile` or stdout.""" |
|
164 if outfile is None: |
|
165 outfile = sys.stdout |
|
166 encoding = ( |
|
167 getattr(outfile, "encoding", None) or |
|
168 getattr(sys.__stdout__, "encoding", None) or |
|
169 locale.getpreferredencoding() |
|
170 ) |
|
171 return encoding |
|
172 |
|
173 |
|
174 class Hasher(object): |
|
175 """Hashes Python data into md5.""" |
|
176 def __init__(self): |
|
177 self.md5 = hashlib.md5() |
|
178 |
|
179 def update(self, v): |
|
180 """Add `v` to the hash, recursively if needed.""" |
|
181 self.md5.update(to_bytes(str(type(v)))) |
|
182 if isinstance(v, string_class): |
|
183 self.md5.update(to_bytes(v)) |
|
184 elif isinstance(v, bytes): |
|
185 self.md5.update(v) |
|
186 elif v is None: |
|
187 pass |
|
188 elif isinstance(v, (int, float)): |
|
189 self.md5.update(to_bytes(str(v))) |
|
190 elif isinstance(v, (tuple, list)): |
|
191 for e in v: |
|
192 self.update(e) |
|
193 elif isinstance(v, dict): |
|
194 keys = v.keys() |
|
195 for k in sorted(keys): |
|
196 self.update(k) |
|
197 self.update(v[k]) |
|
198 else: |
|
199 for k in dir(v): |
|
200 if k.startswith('__'): |
|
201 continue |
|
202 a = getattr(v, k) |
|
203 if inspect.isroutine(a): |
|
204 continue |
|
205 self.update(k) |
|
206 self.update(a) |
|
207 |
|
208 def hexdigest(self): |
|
209 """Retrieve the hex digest of the hash.""" |
|
210 return self.md5.hexdigest() |
|
211 |
|
212 |
|
213 def _needs_to_implement(that, func_name): |
|
214 """Helper to raise NotImplementedError in interface stubs.""" |
|
215 if hasattr(that, "_coverage_plugin_name"): |
|
216 thing = "Plugin" |
|
217 name = that._coverage_plugin_name |
|
218 else: |
|
219 thing = "Class" |
|
220 klass = that.__class__ |
|
221 name = "{klass.__module__}.{klass.__name__}".format(klass=klass) |
|
222 |
|
223 raise NotImplementedError( |
|
224 "{thing} {name!r} needs to implement {func_name}()".format( |
|
225 thing=thing, name=name, func_name=func_name |
|
226 ) |
|
227 ) |
|
228 |
|
229 |
|
230 class CoverageException(Exception): |
|
231 """An exception specific to coverage.py.""" |
|
232 pass |
|
233 |
|
234 |
|
235 class NoSource(CoverageException): |
|
236 """We couldn't find the source for a module.""" |
|
237 pass |
|
238 |
|
239 |
|
240 class NoCode(NoSource): |
|
241 """We couldn't find any code at all.""" |
|
242 pass |
|
243 |
|
244 |
|
245 class NotPython(CoverageException): |
|
246 """A source file turned out not to be parsable Python.""" |
|
247 pass |
|
248 |
|
249 |
|
250 class ExceptionDuringRun(CoverageException): |
|
251 """An exception happened while running customer code. |
|
252 |
|
253 Construct it with three arguments, the values from `sys.exc_info`. |
|
254 |
|
255 """ |
|
256 pass |
|
257 |
|
258 # |
|
259 # eflag: FileType = Python2 |
|