|
1 # Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 |
|
2 # For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt |
|
3 |
|
4 """ |
|
5 Functions to manipulate packed binary representations of number sets. |
|
6 |
|
7 To save space, coverage stores sets of line numbers in SQLite using a packed |
|
8 binary representation called a numbits. A numbits is a set of positive |
|
9 integers. |
|
10 |
|
11 A numbits is stored as a blob in the database. The exact meaning of the bytes |
|
12 in the blobs should be considered an implementation detail that might change in |
|
13 the future. Use these functions to work with those binary blobs of data. |
|
14 |
|
15 """ |
|
16 import json |
|
17 |
|
18 from itertools import zip_longest |
|
19 |
|
20 from coverage.misc import contract, new_contract |
|
21 |
|
22 def _to_blob(b): |
|
23 """Convert a bytestring into a type SQLite will accept for a blob.""" |
|
24 return b |
|
25 |
|
26 new_contract('blob', lambda v: isinstance(v, bytes)) |
|
27 |
|
28 |
|
29 @contract(nums='Iterable', returns='blob') |
|
30 def nums_to_numbits(nums): |
|
31 """Convert `nums` into a numbits. |
|
32 |
|
33 Arguments: |
|
34 nums: a reusable iterable of integers, the line numbers to store. |
|
35 |
|
36 Returns: |
|
37 A binary blob. |
|
38 """ |
|
39 try: |
|
40 nbytes = max(nums) // 8 + 1 |
|
41 except ValueError: |
|
42 # nums was empty. |
|
43 return _to_blob(b'') |
|
44 b = bytearray(nbytes) |
|
45 for num in nums: |
|
46 b[num//8] |= 1 << num % 8 |
|
47 return _to_blob(bytes(b)) |
|
48 |
|
49 |
|
50 @contract(numbits='blob', returns='list[int]') |
|
51 def numbits_to_nums(numbits): |
|
52 """Convert a numbits into a list of numbers. |
|
53 |
|
54 Arguments: |
|
55 numbits: a binary blob, the packed number set. |
|
56 |
|
57 Returns: |
|
58 A list of ints. |
|
59 |
|
60 When registered as a SQLite function by :func:`register_sqlite_functions`, |
|
61 this returns a string, a JSON-encoded list of ints. |
|
62 |
|
63 """ |
|
64 nums = [] |
|
65 for byte_i, byte in enumerate(numbits): |
|
66 for bit_i in range(8): |
|
67 if (byte & (1 << bit_i)): |
|
68 nums.append(byte_i * 8 + bit_i) |
|
69 return nums |
|
70 |
|
71 |
|
72 @contract(numbits1='blob', numbits2='blob', returns='blob') |
|
73 def numbits_union(numbits1, numbits2): |
|
74 """Compute the union of two numbits. |
|
75 |
|
76 Returns: |
|
77 A new numbits, the union of `numbits1` and `numbits2`. |
|
78 """ |
|
79 byte_pairs = zip_longest(numbits1, numbits2, fillvalue=0) |
|
80 return _to_blob(bytes(b1 | b2 for b1, b2 in byte_pairs)) |
|
81 |
|
82 |
|
83 @contract(numbits1='blob', numbits2='blob', returns='blob') |
|
84 def numbits_intersection(numbits1, numbits2): |
|
85 """Compute the intersection of two numbits. |
|
86 |
|
87 Returns: |
|
88 A new numbits, the intersection `numbits1` and `numbits2`. |
|
89 """ |
|
90 byte_pairs = zip_longest(numbits1, numbits2, fillvalue=0) |
|
91 intersection_bytes = bytes(b1 & b2 for b1, b2 in byte_pairs) |
|
92 return _to_blob(intersection_bytes.rstrip(b'\0')) |
|
93 |
|
94 |
|
95 @contract(numbits1='blob', numbits2='blob', returns='bool') |
|
96 def numbits_any_intersection(numbits1, numbits2): |
|
97 """Is there any number that appears in both numbits? |
|
98 |
|
99 Determine whether two number sets have a non-empty intersection. This is |
|
100 faster than computing the intersection. |
|
101 |
|
102 Returns: |
|
103 A bool, True if there is any number in both `numbits1` and `numbits2`. |
|
104 """ |
|
105 byte_pairs = zip_longest(numbits1, numbits2, fillvalue=0) |
|
106 return any(b1 & b2 for b1, b2 in byte_pairs) |
|
107 |
|
108 |
|
109 @contract(num='int', numbits='blob', returns='bool') |
|
110 def num_in_numbits(num, numbits): |
|
111 """Does the integer `num` appear in `numbits`? |
|
112 |
|
113 Returns: |
|
114 A bool, True if `num` is a member of `numbits`. |
|
115 """ |
|
116 nbyte, nbit = divmod(num, 8) |
|
117 if nbyte >= len(numbits): |
|
118 return False |
|
119 return bool(numbits[nbyte] & (1 << nbit)) |
|
120 |
|
121 |
|
122 def register_sqlite_functions(connection): |
|
123 """ |
|
124 Define numbits functions in a SQLite connection. |
|
125 |
|
126 This defines these functions for use in SQLite statements: |
|
127 |
|
128 * :func:`numbits_union` |
|
129 * :func:`numbits_intersection` |
|
130 * :func:`numbits_any_intersection` |
|
131 * :func:`num_in_numbits` |
|
132 * :func:`numbits_to_nums` |
|
133 |
|
134 `connection` is a :class:`sqlite3.Connection <python:sqlite3.Connection>` |
|
135 object. After creating the connection, pass it to this function to |
|
136 register the numbits functions. Then you can use numbits functions in your |
|
137 queries:: |
|
138 |
|
139 import sqlite3 |
|
140 from coverage.numbits import register_sqlite_functions |
|
141 |
|
142 conn = sqlite3.connect('example.db') |
|
143 register_sqlite_functions(conn) |
|
144 c = conn.cursor() |
|
145 # Kind of a nonsense query: find all the files and contexts that |
|
146 # executed line 47 in any file: |
|
147 c.execute( |
|
148 "select file_id, context_id from line_bits where num_in_numbits(?, numbits)", |
|
149 (47,) |
|
150 ) |
|
151 """ |
|
152 connection.create_function("numbits_union", 2, numbits_union) |
|
153 connection.create_function("numbits_intersection", 2, numbits_intersection) |
|
154 connection.create_function("numbits_any_intersection", 2, numbits_any_intersection) |
|
155 connection.create_function("num_in_numbits", 2, num_in_numbits) |
|
156 connection.create_function("numbits_to_nums", 1, lambda b: json.dumps(numbits_to_nums(b))) |