|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2016 - 2022 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing the storage backend for the hex editor. |
|
8 """ |
|
9 |
|
10 import sys |
|
11 |
|
12 from PyQt6.QtCore import QBuffer, QIODevice, QByteArray |
|
13 |
|
14 |
|
15 class HexEditChunk: |
|
16 """ |
|
17 Class implementing a container for the data chunks. |
|
18 """ |
|
19 def __init__(self): |
|
20 """ |
|
21 Constructor |
|
22 """ |
|
23 self.data = bytearray() |
|
24 self.dataChanged = bytearray() |
|
25 self.absPos = 0 |
|
26 |
|
27 |
|
28 class HexEditChunks: |
|
29 """ |
|
30 Class implementing the storage backend for the hex editor. |
|
31 |
|
32 When HexEditWidget loads data, HexEditChunks access them using a QIODevice |
|
33 interface. When the app uses a QByteArray or Python bytearray interface, |
|
34 QBuffer is used to provide again a QIODevice like interface. No data will |
|
35 be changed, therefore HexEditChunks opens the QIODevice in |
|
36 QIODevice.OpenModeFlag.ReadOnly mode. After every access HexEditChunks |
|
37 closes the QIODevice. That's why external applications can overwrite |
|
38 files while HexEditWidget shows them. |
|
39 |
|
40 When the the user starts to edit the data, HexEditChunks creates a local |
|
41 copy of a chunk of data (4 kilobytes) and notes all changes there. Parallel |
|
42 to that chunk, there is a second chunk, which keeps track of which bytes |
|
43 are changed and which are not. |
|
44 """ |
|
45 BUFFER_SIZE = 0x10000 |
|
46 CHUNK_SIZE = 0x1000 |
|
47 READ_CHUNK_MASK = 0xfffffffffffff000 |
|
48 |
|
49 def __init__(self, ioDevice=None): |
|
50 """ |
|
51 Constructor |
|
52 |
|
53 @param ioDevice io device to get the data from |
|
54 @type QIODevice |
|
55 """ |
|
56 self.__ioDevice = None |
|
57 self.__pos = 0 |
|
58 self.__size = 0 |
|
59 self.__chunks = [] |
|
60 |
|
61 if ioDevice is None: |
|
62 buf = QBuffer() |
|
63 self.setIODevice(buf) |
|
64 else: |
|
65 self.setIODevice(ioDevice) |
|
66 |
|
67 def setIODevice(self, ioDevice): |
|
68 """ |
|
69 Public method to set an io device to read the binary data from. |
|
70 |
|
71 @param ioDevice io device to get the data from |
|
72 @type QIODevice |
|
73 @return flag indicating successful operation |
|
74 @rtype bool |
|
75 """ |
|
76 self.__ioDevice = ioDevice |
|
77 ok = self.__ioDevice.open(QIODevice.OpenModeFlag.ReadOnly) |
|
78 if ok: |
|
79 # open successfully |
|
80 self.__size = self.__ioDevice.size() |
|
81 self.__ioDevice.close() |
|
82 else: |
|
83 # fallback is an empty buffer |
|
84 self.__ioDevice = QBuffer() |
|
85 self.__size = 0 |
|
86 |
|
87 self.__chunks = [] |
|
88 self.__pos = 0 |
|
89 |
|
90 return ok |
|
91 |
|
92 def data(self, pos=0, maxSize=-1, highlighted=None): |
|
93 """ |
|
94 Public method to get data out of the chunks. |
|
95 |
|
96 @param pos position to get bytes from |
|
97 @type int |
|
98 @param maxSize maximum amount of bytes to get |
|
99 @type int |
|
100 @param highlighted reference to a byte array storing highlighting info |
|
101 @type bytearray |
|
102 @return retrieved data |
|
103 @rtype bytearray |
|
104 """ |
|
105 ioDelta = 0 |
|
106 chunkIdx = 0 |
|
107 |
|
108 chunk = HexEditChunk() |
|
109 buffer = bytearray() |
|
110 |
|
111 if highlighted is not None: |
|
112 del highlighted[:] |
|
113 |
|
114 if pos >= self.__size: |
|
115 return buffer |
|
116 |
|
117 if maxSize < 0: |
|
118 maxSize = self.__size |
|
119 elif (pos + maxSize) > self.__size: |
|
120 maxSize = self.__size - pos |
|
121 |
|
122 self.__ioDevice.open(QIODevice.OpenModeFlag.ReadOnly) |
|
123 |
|
124 while maxSize > 0: |
|
125 chunk.absPos = sys.maxsize |
|
126 chunksLoopOngoing = True |
|
127 while chunkIdx < len(self.__chunks) and chunksLoopOngoing: |
|
128 # In this section, we track changes before our required data |
|
129 # and we take the edited data, if availible. ioDelta is a |
|
130 # difference counter to justify the read pointer to the |
|
131 # original data, if data in between was deleted or inserted. |
|
132 |
|
133 chunk = self.__chunks[chunkIdx] |
|
134 if chunk.absPos > pos: |
|
135 chunksLoopOngoing = False |
|
136 else: |
|
137 chunkIdx += 1 |
|
138 chunkOfs = pos - chunk.absPos |
|
139 if maxSize > (len(chunk.data) - chunkOfs): |
|
140 count = len(chunk.data) - chunkOfs |
|
141 ioDelta += self.CHUNK_SIZE - len(chunk.data) |
|
142 else: |
|
143 count = maxSize |
|
144 if count > 0: |
|
145 buffer += chunk.data[chunkOfs:chunkOfs + count] |
|
146 maxSize -= count |
|
147 pos += count |
|
148 if highlighted is not None: |
|
149 highlighted += chunk.dataChanged[ |
|
150 chunkOfs:chunkOfs + count] |
|
151 |
|
152 if maxSize > 0 and pos < chunk.absPos: |
|
153 # In this section, we read data from the original source. This |
|
154 # will only happen, when no copied data is available. |
|
155 if chunk.absPos - pos > maxSize: |
|
156 byteCount = maxSize |
|
157 else: |
|
158 byteCount = chunk.absPos - pos |
|
159 |
|
160 maxSize -= byteCount |
|
161 self.__ioDevice.seek(pos + ioDelta) |
|
162 readBuffer = bytearray(self.__ioDevice.read(byteCount)) |
|
163 buffer += readBuffer |
|
164 if highlighted is not None: |
|
165 highlighted += bytearray(len(readBuffer)) |
|
166 pos += len(readBuffer) |
|
167 |
|
168 self.__ioDevice.close() |
|
169 return buffer |
|
170 |
|
171 def write(self, ioDevice, pos=0, count=-1): |
|
172 """ |
|
173 Public method to write data to an io device. |
|
174 |
|
175 @param ioDevice io device to write the data to |
|
176 @type QIODevice |
|
177 @param pos position to write bytes from |
|
178 @type int |
|
179 @param count amount of bytes to write |
|
180 @type int |
|
181 @return flag indicating success |
|
182 @rtype bool |
|
183 """ |
|
184 if count == -1: |
|
185 # write all data |
|
186 count = self.__size |
|
187 |
|
188 ok = ioDevice.open(QIODevice.OpenModeFlag.WriteOnly) |
|
189 if ok: |
|
190 idx = pos |
|
191 while idx < count: |
|
192 data = self.data(idx, self.BUFFER_SIZE) |
|
193 ioDevice.write(QByteArray(data)) |
|
194 |
|
195 # increment loop variable |
|
196 idx += self.BUFFER_SIZE |
|
197 |
|
198 ioDevice.close() |
|
199 |
|
200 return ok |
|
201 |
|
202 def setDataChanged(self, pos, dataChanged): |
|
203 """ |
|
204 Public method to set highlighting info. |
|
205 |
|
206 @param pos position to set highlighting info for |
|
207 @type int |
|
208 @param dataChanged flag indicating changed data |
|
209 @type bool |
|
210 """ |
|
211 if pos < 0 or pos >= self.__size: |
|
212 # position is out of range, do nothing |
|
213 return |
|
214 chunkIdx = self.__getChunkIndex(pos) |
|
215 posInChunk = pos - self.__chunks[chunkIdx].absPos |
|
216 self.__chunks[chunkIdx].dataChanged[posInChunk] = int(dataChanged) |
|
217 |
|
218 def dataChanged(self, pos): |
|
219 """ |
|
220 Public method to test, if some data was changed. |
|
221 |
|
222 @param pos byte position to check |
|
223 @type int |
|
224 @return flag indicating the changed state |
|
225 @rtype bool |
|
226 """ |
|
227 highlighted = bytearray() |
|
228 self.data(pos, 1, highlighted) |
|
229 return highlighted and bool(highlighted[0]) |
|
230 |
|
231 def indexOf(self, byteArray, start): |
|
232 """ |
|
233 Public method to search the first occurrence of some data. |
|
234 |
|
235 @param byteArray data to search for |
|
236 @type bytearray |
|
237 @param start position to start the search at |
|
238 @type int |
|
239 @return position the data was found at or -1 if nothing could be found |
|
240 @rtype int |
|
241 """ |
|
242 ba = bytearray(byteArray) |
|
243 |
|
244 result = -1 |
|
245 pos = start |
|
246 while pos < self.__size: |
|
247 buffer = self.data(pos, self.BUFFER_SIZE + len(ba) - 1) |
|
248 findPos = buffer.find(ba) |
|
249 if findPos >= 0: |
|
250 result = pos + findPos |
|
251 break |
|
252 |
|
253 # increment loop variable |
|
254 pos += self.BUFFER_SIZE |
|
255 |
|
256 return result |
|
257 |
|
258 def lastIndexOf(self, byteArray, start): |
|
259 """ |
|
260 Public method to search the last occurrence of some data. |
|
261 |
|
262 @param byteArray data to search for |
|
263 @type bytearray |
|
264 @param start position to start the search at |
|
265 @type int |
|
266 @return position the data was found at or -1 if nothing could be found |
|
267 @rtype int |
|
268 """ |
|
269 ba = bytearray(byteArray) |
|
270 |
|
271 result = -1 |
|
272 pos = start |
|
273 while pos > 0 and result < 0: |
|
274 sPos = pos - self.BUFFER_SIZE - len(ba) + 1 |
|
275 if sPos < 0: |
|
276 sPos = 0 |
|
277 |
|
278 buffer = self.data(sPos, pos - sPos) |
|
279 findPos = buffer.rfind(ba) |
|
280 if findPos >= 0: |
|
281 result = sPos + findPos |
|
282 break |
|
283 |
|
284 # increment loop variable |
|
285 pos -= self.BUFFER_SIZE |
|
286 |
|
287 return result |
|
288 |
|
289 def insert(self, pos, data): |
|
290 """ |
|
291 Public method to insert a byte. |
|
292 |
|
293 @param pos position to insert at |
|
294 @type int |
|
295 @param data byte to insert |
|
296 @type int (range 0 to 255) |
|
297 @return flag indicating success |
|
298 @rtype bool |
|
299 """ |
|
300 if pos < 0 or pos > self.__size: |
|
301 # position is out of range, do nothing |
|
302 return False |
|
303 |
|
304 chunkIdx = ( |
|
305 self.__getChunkIndex(pos - 1) |
|
306 if pos == self.__size else |
|
307 self.__getChunkIndex(pos) |
|
308 ) |
|
309 chunk = self.__chunks[chunkIdx] |
|
310 posInChunk = pos - chunk.absPos |
|
311 chunk.data.insert(posInChunk, data) |
|
312 chunk.dataChanged.insert(posInChunk, 1) |
|
313 for idx in range(chunkIdx + 1, len(self.__chunks)): |
|
314 self.__chunks[idx].absPos += 1 |
|
315 self.__size += 1 |
|
316 self.__pos = pos |
|
317 return True |
|
318 |
|
319 def overwrite(self, pos, data): |
|
320 """ |
|
321 Public method to overwrite a byte. |
|
322 |
|
323 @param pos position to overwrite |
|
324 @type int |
|
325 @param data byte to overwrite with |
|
326 @type int (range 0 to 255) |
|
327 @return flag indicating success |
|
328 @rtype bool |
|
329 """ |
|
330 if pos < 0 or pos >= self.__size: |
|
331 # position is out of range, do nothing |
|
332 return False |
|
333 |
|
334 chunkIdx = self.__getChunkIndex(pos) |
|
335 chunk = self.__chunks[chunkIdx] |
|
336 posInChunk = pos - chunk.absPos |
|
337 chunk.data[posInChunk] = data |
|
338 chunk.dataChanged[posInChunk] = 1 |
|
339 self.__pos = pos |
|
340 return True |
|
341 |
|
342 def removeAt(self, pos): |
|
343 """ |
|
344 Public method to remove a byte. |
|
345 |
|
346 @param pos position to remove |
|
347 @type int |
|
348 @return flag indicating success |
|
349 @rtype bool |
|
350 """ |
|
351 if pos < 0 or pos >= self.__size: |
|
352 # position is out of range, do nothing |
|
353 return False |
|
354 |
|
355 chunkIdx = self.__getChunkIndex(pos) |
|
356 chunk = self.__chunks[chunkIdx] |
|
357 posInChunk = pos - chunk.absPos |
|
358 chunk.data.pop(posInChunk) |
|
359 chunk.dataChanged.pop(posInChunk) |
|
360 for idx in range(chunkIdx + 1, len(self.__chunks)): |
|
361 self.__chunks[idx].absPos -= 1 |
|
362 self.__size -= 1 |
|
363 self.__pos = pos |
|
364 return True |
|
365 |
|
366 def __getitem__(self, pos): |
|
367 """ |
|
368 Special method to get a byte at a position. |
|
369 |
|
370 Note: This realizes the [] get operator. |
|
371 |
|
372 @param pos position of byte to get |
|
373 @type int |
|
374 @return requested byte |
|
375 @rtype int (range 0 to 255) |
|
376 """ |
|
377 if pos >= self.__size: |
|
378 return 0 |
|
379 ## raise IndexError |
|
380 |
|
381 return self.data(pos, 1)[0] |
|
382 |
|
383 def pos(self): |
|
384 """ |
|
385 Public method to get the current position. |
|
386 |
|
387 @return current position |
|
388 @rtype int |
|
389 """ |
|
390 return self.__pos |
|
391 |
|
392 def size(self): |
|
393 """ |
|
394 Public method to get the current data size. |
|
395 |
|
396 @return current data size |
|
397 @rtype int |
|
398 """ |
|
399 return self.__size |
|
400 |
|
401 def __getChunkIndex(self, absPos): |
|
402 """ |
|
403 Private method to get the chunk index for a position. |
|
404 |
|
405 This method checks, if there is already a copied chunk available. If |
|
406 there is one, it returns its index. If there is no copied chunk |
|
407 available, original data will be copied into a new chunk. |
|
408 |
|
409 @param absPos absolute position of the data. |
|
410 @type int |
|
411 @return index of the chunk containing the position |
|
412 @rtype int |
|
413 """ |
|
414 foundIdx = -1 |
|
415 insertIdx = 0 |
|
416 ioDelta = 0 |
|
417 |
|
418 for idx in range(len(self.__chunks)): |
|
419 chunk = self.__chunks[idx] |
|
420 if ( |
|
421 absPos >= chunk.absPos and |
|
422 absPos < (chunk.absPos + len(chunk.data)) |
|
423 ): |
|
424 foundIdx = idx |
|
425 break |
|
426 |
|
427 if absPos < chunk.absPos: |
|
428 insertIdx = idx |
|
429 break |
|
430 |
|
431 ioDelta += len(chunk.data) - self.CHUNK_SIZE |
|
432 insertIdx = idx + 1 |
|
433 |
|
434 if foundIdx == -1: |
|
435 newChunk = HexEditChunk() |
|
436 readAbsPos = absPos - ioDelta |
|
437 readPos = readAbsPos & self.READ_CHUNK_MASK |
|
438 self.__ioDevice.open(QIODevice.OpenModeFlag.ReadOnly) |
|
439 self.__ioDevice.seek(readPos) |
|
440 newChunk.data = bytearray(self.__ioDevice.read(self.CHUNK_SIZE)) |
|
441 self.__ioDevice.close() |
|
442 newChunk.absPos = absPos - (readAbsPos - readPos) |
|
443 newChunk.dataChanged = bytearray(len(newChunk.data)) |
|
444 self.__chunks.insert(insertIdx, newChunk) |
|
445 foundIdx = insertIdx |
|
446 |
|
447 return foundIdx |