|
1 # Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 |
|
2 # For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt |
|
3 |
|
4 """ |
|
5 Functions to manipulate packed binary representations of number sets. |
|
6 |
|
7 To save space, coverage stores sets of line numbers in SQLite using a packed |
|
8 binary representation called a numbits. A numbits is a set of positive |
|
9 integers. |
|
10 |
|
11 A numbits is stored as a blob in the database. The exact meaning of the bytes |
|
12 in the blobs should be considered an implementation detail that might change in |
|
13 the future. Use these functions to work with those binary blobs of data. |
|
14 |
|
15 """ |
|
16 import json |
|
17 |
|
18 from coverage import env |
|
19 from coverage.backward import byte_to_int, bytes_to_ints, binary_bytes, zip_longest |
|
20 from coverage.misc import contract, new_contract |
|
21 |
|
22 if env.PY3: |
|
23 def _to_blob(b): |
|
24 """Convert a bytestring into a type SQLite will accept for a blob.""" |
|
25 return b |
|
26 |
|
27 new_contract('blob', lambda v: isinstance(v, bytes)) |
|
28 else: |
|
29 def _to_blob(b): |
|
30 """Convert a bytestring into a type SQLite will accept for a blob.""" |
|
31 return buffer(b) # pylint: disable=undefined-variable |
|
32 |
|
33 new_contract('blob', lambda v: isinstance(v, buffer)) # pylint: disable=undefined-variable |
|
34 |
|
35 |
|
36 @contract(nums='Iterable', returns='blob') |
|
37 def nums_to_numbits(nums): |
|
38 """Convert `nums` into a numbits. |
|
39 |
|
40 Arguments: |
|
41 nums: a reusable iterable of integers, the line numbers to store. |
|
42 |
|
43 Returns: |
|
44 A binary blob. |
|
45 """ |
|
46 try: |
|
47 nbytes = max(nums) // 8 + 1 |
|
48 except ValueError: |
|
49 # nums was empty. |
|
50 return _to_blob(b'') |
|
51 b = bytearray(nbytes) |
|
52 for num in nums: |
|
53 b[num//8] |= 1 << num % 8 |
|
54 return _to_blob(bytes(b)) |
|
55 |
|
56 |
|
57 @contract(numbits='blob', returns='list[int]') |
|
58 def numbits_to_nums(numbits): |
|
59 """Convert a numbits into a list of numbers. |
|
60 |
|
61 Arguments: |
|
62 numbits: a binary blob, the packed number set. |
|
63 |
|
64 Returns: |
|
65 A list of ints. |
|
66 |
|
67 When registered as a SQLite function by :func:`register_sqlite_functions`, |
|
68 this returns a string, a JSON-encoded list of ints. |
|
69 |
|
70 """ |
|
71 nums = [] |
|
72 for byte_i, byte in enumerate(bytes_to_ints(numbits)): |
|
73 for bit_i in range(8): |
|
74 if (byte & (1 << bit_i)): |
|
75 nums.append(byte_i * 8 + bit_i) |
|
76 return nums |
|
77 |
|
78 |
|
79 @contract(numbits1='blob', numbits2='blob', returns='blob') |
|
80 def numbits_union(numbits1, numbits2): |
|
81 """Compute the union of two numbits. |
|
82 |
|
83 Returns: |
|
84 A new numbits, the union of `numbits1` and `numbits2`. |
|
85 """ |
|
86 byte_pairs = zip_longest(bytes_to_ints(numbits1), bytes_to_ints(numbits2), fillvalue=0) |
|
87 return _to_blob(binary_bytes(b1 | b2 for b1, b2 in byte_pairs)) |
|
88 |
|
89 |
|
90 @contract(numbits1='blob', numbits2='blob', returns='blob') |
|
91 def numbits_intersection(numbits1, numbits2): |
|
92 """Compute the intersection of two numbits. |
|
93 |
|
94 Returns: |
|
95 A new numbits, the intersection `numbits1` and `numbits2`. |
|
96 """ |
|
97 byte_pairs = zip_longest(bytes_to_ints(numbits1), bytes_to_ints(numbits2), fillvalue=0) |
|
98 intersection_bytes = binary_bytes(b1 & b2 for b1, b2 in byte_pairs) |
|
99 return _to_blob(intersection_bytes.rstrip(b'\0')) |
|
100 |
|
101 |
|
102 @contract(numbits1='blob', numbits2='blob', returns='bool') |
|
103 def numbits_any_intersection(numbits1, numbits2): |
|
104 """Is there any number that appears in both numbits? |
|
105 |
|
106 Determine whether two number sets have a non-empty intersection. This is |
|
107 faster than computing the intersection. |
|
108 |
|
109 Returns: |
|
110 A bool, True if there is any number in both `numbits1` and `numbits2`. |
|
111 """ |
|
112 byte_pairs = zip_longest(bytes_to_ints(numbits1), bytes_to_ints(numbits2), fillvalue=0) |
|
113 return any(b1 & b2 for b1, b2 in byte_pairs) |
|
114 |
|
115 |
|
116 @contract(num='int', numbits='blob', returns='bool') |
|
117 def num_in_numbits(num, numbits): |
|
118 """Does the integer `num` appear in `numbits`? |
|
119 |
|
120 Returns: |
|
121 A bool, True if `num` is a member of `numbits`. |
|
122 """ |
|
123 nbyte, nbit = divmod(num, 8) |
|
124 if nbyte >= len(numbits): |
|
125 return False |
|
126 return bool(byte_to_int(numbits[nbyte]) & (1 << nbit)) |
|
127 |
|
128 |
|
129 def register_sqlite_functions(connection): |
|
130 """ |
|
131 Define numbits functions in a SQLite connection. |
|
132 |
|
133 This defines these functions for use in SQLite statements: |
|
134 |
|
135 * :func:`numbits_union` |
|
136 * :func:`numbits_intersection` |
|
137 * :func:`numbits_any_intersection` |
|
138 * :func:`num_in_numbits` |
|
139 * :func:`numbits_to_nums` |
|
140 |
|
141 `connection` is a :class:`sqlite3.Connection <python:sqlite3.Connection>` |
|
142 object. After creating the connection, pass it to this function to |
|
143 register the numbits functions. Then you can use numbits functions in your |
|
144 queries:: |
|
145 |
|
146 import sqlite3 |
|
147 from coverage.numbits import register_sqlite_functions |
|
148 |
|
149 conn = sqlite3.connect('example.db') |
|
150 register_sqlite_functions(conn) |
|
151 c = conn.cursor() |
|
152 # Kind of a nonsense query: find all the files and contexts that |
|
153 # executed line 47 in any file: |
|
154 c.execute( |
|
155 "select file_id, context_id from line_bits where num_in_numbits(?, numbits)", |
|
156 (47,) |
|
157 ) |
|
158 """ |
|
159 connection.create_function("numbits_union", 2, numbits_union) |
|
160 connection.create_function("numbits_intersection", 2, numbits_intersection) |
|
161 connection.create_function("numbits_any_intersection", 2, numbits_any_intersection) |
|
162 connection.create_function("num_in_numbits", 2, num_in_numbits) |
|
163 connection.create_function("numbits_to_nums", 1, lambda b: json.dumps(numbits_to_nums(b))) |