|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2016 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing the storage backend for the hex editor. |
|
8 """ |
|
9 |
|
10 from __future__ import unicode_literals |
|
11 |
|
12 import sys |
|
13 |
|
14 from PyQt5.QtCore import QBuffer, QIODevice, QByteArray |
|
15 |
|
16 |
|
17 class HexEditChunk(object): |
|
18 """ |
|
19 Class implementing a container for the data chunks. |
|
20 """ |
|
21 def __init__(self): |
|
22 """ |
|
23 Constructor |
|
24 """ |
|
25 self.data = bytearray() |
|
26 self.dataChanged = bytearray() |
|
27 self.absPos = 0 |
|
28 |
|
29 |
|
30 class HexEditChunks(object): |
|
31 """ |
|
32 Class implementing the storage backend for the hex editor. |
|
33 |
|
34 When HexEditWidget loads data, HexEditChunks access them using a QIODevice |
|
35 interface. When the app uses a QByteArray or Python bytearray interface, |
|
36 QBuffer is used to provide again a QIODevice like interface. No data will |
|
37 be changed, therefore HexEditChunks opens the QIODevice in |
|
38 QIODevice.ReadOnly mode. After every access HexEditChunks closes the |
|
39 QIODevice. That's why external applications can overwrite files while |
|
40 HexEditWidget shows them. |
|
41 |
|
42 When the the user starts to edit the data, HexEditChunks creates a local |
|
43 copy of a chunk of data (4 kilobytes) and notes all changes there. Parallel |
|
44 to that chunk, there is a second chunk, which keeps track of which bytes |
|
45 are changed and which are not. |
|
46 """ |
|
47 BUFFER_SIZE = 0x10000 |
|
48 CHUNK_SIZE = 0x1000 |
|
49 READ_CHUNK_MASK = 0xfffffffffffff000 |
|
50 |
|
51 def __init__(self, ioDevice=None): |
|
52 """ |
|
53 Constructor |
|
54 |
|
55 @param ioDevice io device to get the data from |
|
56 @type QIODevice |
|
57 """ |
|
58 self.__ioDevice = None |
|
59 self.__pos = 0 |
|
60 self.__size = 0 |
|
61 self.__chunks = [] |
|
62 |
|
63 if ioDevice is None: |
|
64 buf = QBuffer() |
|
65 self.setIODevice(buf) |
|
66 else: |
|
67 self.setIODevice(ioDevice) |
|
68 |
|
69 def setIODevice(self, ioDevice): |
|
70 """ |
|
71 Public method to set an io device to read the binary data from. |
|
72 |
|
73 @param ioDevice io device to get the data from |
|
74 @type QIODevice |
|
75 @return flag indicating successful operation |
|
76 @rtype bool |
|
77 """ |
|
78 self.__ioDevice = ioDevice |
|
79 ok = self.__ioDevice.open(QIODevice.ReadOnly) |
|
80 if ok: |
|
81 # open successfully |
|
82 self.__size = self.__ioDevice.size() |
|
83 self.__ioDevice.close() |
|
84 else: |
|
85 # fallback is an empty buffer |
|
86 self.__ioDevice = QBuffer() |
|
87 self.__size = 0 |
|
88 |
|
89 self.__chunks = [] |
|
90 self.__pos = 0 |
|
91 |
|
92 return ok |
|
93 |
|
94 def data(self, pos=0, maxSize=-1, highlighted=None): |
|
95 """ |
|
96 Public method to get data out of the chunks. |
|
97 |
|
98 @param pos position to get bytes from |
|
99 @type int |
|
100 @param maxSize maximum amount of bytes to get |
|
101 @type int |
|
102 @param highlighted reference to a byte array storing highlighting info |
|
103 @byte bytearray |
|
104 @return retrieved data |
|
105 @rtype bytearray |
|
106 """ |
|
107 ioDelta = 0 |
|
108 chunkIdx = 0 |
|
109 |
|
110 chunk = HexEditChunk() |
|
111 buffer = bytearray() |
|
112 |
|
113 if highlighted is not None: |
|
114 del highlighted[:] |
|
115 |
|
116 if pos >= self.__size: |
|
117 return buffer |
|
118 |
|
119 if maxSize < 0: |
|
120 maxSize = self.__size |
|
121 elif (pos + maxSize) > self.__size: |
|
122 maxSize = self.__size - pos |
|
123 |
|
124 self.__ioDevice.open(QIODevice.ReadOnly) |
|
125 |
|
126 while maxSize > 0: |
|
127 chunk.absPos = sys.maxsize |
|
128 chunksLoopOngoing = True |
|
129 while chunkIdx < len(self.__chunks) and chunksLoopOngoing: |
|
130 # In this section, we track changes before our required data |
|
131 # and we take the edited data, if availible. ioDelta is a |
|
132 # difference counter to justify the read pointer to the |
|
133 # original data, if data in between was deleted or inserted. |
|
134 |
|
135 chunk = self.__chunks[chunkIdx] |
|
136 if chunk.absPos > pos: |
|
137 chunksLoopOngoing = False |
|
138 else: |
|
139 chunkIdx += 1 |
|
140 chunkOfs = pos - chunk.absPos |
|
141 if maxSize > (len(chunk.data) - chunkOfs): |
|
142 count = len(chunk.data) - chunkOfs |
|
143 ioDelta += self.CHUNK_SIZE - len(chunk.data) |
|
144 else: |
|
145 count = maxSize |
|
146 if count > 0: |
|
147 buffer += chunk.data[chunkOfs:chunkOfs + count] |
|
148 maxSize -= count |
|
149 pos += count |
|
150 if highlighted is not None: |
|
151 highlighted += \ |
|
152 chunk.dataChanged[chunkOfs:chunkOfs + count] |
|
153 |
|
154 if maxSize > 0 and pos < chunk.absPos: |
|
155 # In this section, we read data from the original source. This |
|
156 # will only happen, when no copied data is available. |
|
157 if chunk.absPos - pos > maxSize: |
|
158 byteCount = maxSize |
|
159 else: |
|
160 byteCount = chunk.absPos - pos |
|
161 |
|
162 maxSize -= byteCount |
|
163 self.__ioDevice.seek(pos + ioDelta) |
|
164 readBuffer = bytearray(self.__ioDevice.read(byteCount)) |
|
165 buffer += readBuffer |
|
166 if highlighted is not None: |
|
167 highlighted += bytearray(len(readBuffer)) |
|
168 pos += len(readBuffer) |
|
169 |
|
170 self.__ioDevice.close() |
|
171 return buffer |
|
172 |
|
173 def write(self, ioDevice, pos=0, count=-1): |
|
174 """ |
|
175 Public method to write data to an io device. |
|
176 |
|
177 @param ioDevice io device to write the data to |
|
178 @type QIODevice |
|
179 @param pos position to write bytes from |
|
180 @type int |
|
181 @param count amount of bytes to write |
|
182 @type int |
|
183 @return flag indicating success |
|
184 @rtype bool |
|
185 """ |
|
186 if count == -1: |
|
187 # write all data |
|
188 count = self.__size |
|
189 |
|
190 ok = ioDevice.open(QIODevice.WriteOnly) |
|
191 if ok: |
|
192 idx = pos |
|
193 while idx < count: |
|
194 data = self.data(idx, self.BUFFER_SIZE) |
|
195 ioDevice.write(QByteArray(data)) |
|
196 |
|
197 # increment loop variable |
|
198 idx += self.BUFFER_SIZE |
|
199 |
|
200 ioDevice.close() |
|
201 |
|
202 return ok |
|
203 |
|
204 def setDataChanged(self, pos, dataChanged): |
|
205 """ |
|
206 Public method to set highlighting info. |
|
207 |
|
208 @param pos position to set highlighting info for |
|
209 @type int |
|
210 @param dataChanged flag indicating changed data |
|
211 @type bool |
|
212 """ |
|
213 if pos < 0 or pos >= self.__size: |
|
214 # position is out of range, do nothing |
|
215 return |
|
216 chunkIdx = self.__getChunkIndex(pos) |
|
217 posInChunk = pos - self.__chunks[chunkIdx].absPos |
|
218 self.__chunks[chunkIdx].dataChanged[posInChunk] = int(dataChanged) |
|
219 |
|
220 def dataChanged(self, pos): |
|
221 """ |
|
222 Public method to test, if some data was changed. |
|
223 |
|
224 @param pos byte position to check |
|
225 @type int |
|
226 @return flag indicating the changed state |
|
227 @rtype bool |
|
228 """ |
|
229 highlighted = bytearray() |
|
230 self.data(pos, 1, highlighted) |
|
231 return highlighted and bool(highlighted[0]) |
|
232 |
|
233 def indexOf(self, byteArray, start): |
|
234 """ |
|
235 Public method to search the first occurrence of some data. |
|
236 |
|
237 @param byteArray data to search for |
|
238 @type bytearray |
|
239 @param start position to start the search at |
|
240 @type int |
|
241 @return position the data was found at or -1 if nothing could be found |
|
242 @rtype int |
|
243 """ |
|
244 ba = bytearray(byteArray) |
|
245 |
|
246 result = -1 |
|
247 pos = start |
|
248 while pos < self.__size: |
|
249 buffer = self.data(pos, self.BUFFER_SIZE + len(ba) - 1) |
|
250 findPos = buffer.find(ba) |
|
251 if findPos >= 0: |
|
252 result = pos + findPos |
|
253 break |
|
254 |
|
255 # increment loop variable |
|
256 pos += self.BUFFER_SIZE |
|
257 |
|
258 return result |
|
259 |
|
260 def lastIndexOf(self, byteArray, start): |
|
261 """ |
|
262 Public method to search the last occurrence of some data. |
|
263 |
|
264 @param byteArray data to search for |
|
265 @type bytearray |
|
266 @param start position to start the search at |
|
267 @type int |
|
268 @return position the data was found at or -1 if nothing could be found |
|
269 @rtype int |
|
270 """ |
|
271 ba = bytearray(byteArray) |
|
272 |
|
273 result = -1 |
|
274 pos = start |
|
275 while pos > 0 and result < 0: |
|
276 sPos = pos - self.BUFFER_SIZE - len(ba) + 1 |
|
277 if sPos < 0: |
|
278 sPos = 0 |
|
279 |
|
280 buffer = self.data(sPos, pos - sPos) |
|
281 findPos = buffer.rfind(ba) |
|
282 if findPos >= 0: |
|
283 result = sPos + findPos |
|
284 break |
|
285 |
|
286 # increment loop variable |
|
287 pos -= self.BUFFER_SIZE |
|
288 |
|
289 return result |
|
290 |
|
291 def insert(self, pos, data): |
|
292 """ |
|
293 Public method to insert a byte. |
|
294 |
|
295 @param pos position to insert at |
|
296 @type int |
|
297 @param data byte to insert |
|
298 @type int (range 0 to 255) |
|
299 @return flag indicating success |
|
300 @rtype bool |
|
301 """ |
|
302 if pos < 0 or pos > self.__size: |
|
303 # position is out of range, do nothing |
|
304 return False |
|
305 |
|
306 if pos == self.__size: |
|
307 chunkIdx = self.__getChunkIndex(pos - 1) |
|
308 else: |
|
309 chunkIdx = self.__getChunkIndex(pos) |
|
310 chunk = self.__chunks[chunkIdx] |
|
311 posInChunk = pos - chunk.absPos |
|
312 chunk.data.insert(posInChunk, data) |
|
313 chunk.dataChanged.insert(posInChunk, 1) |
|
314 for idx in range(chunkIdx + 1, len(self.__chunks)): |
|
315 self.__chunks[idx].absPos += 1 |
|
316 self.__size += 1 |
|
317 self.__pos = pos |
|
318 return True |
|
319 |
|
320 def overwrite(self, pos, data): |
|
321 """ |
|
322 Public method to overwrite a byte. |
|
323 |
|
324 @param pos position to overwrite |
|
325 @type int |
|
326 @param data byte to overwrite with |
|
327 @type int (range 0 to 255) |
|
328 @return flag indicating success |
|
329 @rtype bool |
|
330 """ |
|
331 if pos < 0 or pos >= self.__size: |
|
332 # position is out of range, do nothing |
|
333 return False |
|
334 |
|
335 chunkIdx = self.__getChunkIndex(pos) |
|
336 chunk = self.__chunks[chunkIdx] |
|
337 posInChunk = pos - chunk.absPos |
|
338 chunk.data[posInChunk] = data |
|
339 chunk.dataChanged[posInChunk] = 1 |
|
340 self.__pos = pos |
|
341 return True |
|
342 |
|
343 def removeAt(self, pos): |
|
344 """ |
|
345 Public method to remove a byte. |
|
346 |
|
347 @param pos position to remove |
|
348 @type int |
|
349 @return flag indicating success |
|
350 @rtype bool |
|
351 """ |
|
352 if pos < 0 or pos >= self.__size: |
|
353 # position is out of range, do nothing |
|
354 return |
|
355 |
|
356 chunkIdx = self.__getChunkIndex(pos) |
|
357 chunk = self.__chunks[chunkIdx] |
|
358 posInChunk = pos - chunk.absPos |
|
359 chunk.data.pop(posInChunk) |
|
360 chunk.dataChanged.pop(posInChunk) |
|
361 for idx in range(chunkIdx + 1, len(self.__chunks)): |
|
362 self.__chunks[idx].absPos -= 1 |
|
363 self.__size -= 1 |
|
364 self.__pos = pos |
|
365 return True |
|
366 |
|
367 def __getitem__(self, pos): |
|
368 """ |
|
369 Special method to get a byte at a position. |
|
370 |
|
371 Note: This realizes the [] get operator. |
|
372 |
|
373 @param pos position of byte to get |
|
374 @type int |
|
375 @return requested byte |
|
376 @rtype int (range 0 to 255) |
|
377 """ |
|
378 if pos >= self.__size: |
|
379 return 0 |
|
380 ## raise IndexError |
|
381 |
|
382 return self.data(pos, 1)[0] |
|
383 |
|
384 def pos(self): |
|
385 """ |
|
386 Public method to get the current position. |
|
387 |
|
388 @return current position |
|
389 @rtype int |
|
390 """ |
|
391 return self.__pos |
|
392 |
|
393 def size(self): |
|
394 """ |
|
395 Public method to get the current data size. |
|
396 |
|
397 @return current data size |
|
398 @rtype int |
|
399 """ |
|
400 return self.__size |
|
401 |
|
402 def __getChunkIndex(self, absPos): |
|
403 """ |
|
404 Private method to get the chunk index for a position. |
|
405 |
|
406 This method checks, if there is already a copied chunk available. If |
|
407 there is one, it returns its index. If there is no copied chunk |
|
408 available, original data will be copied into a new chunk. |
|
409 |
|
410 @param absPos absolute position of the data. |
|
411 @type int |
|
412 @return index of the chunk containing the position |
|
413 @rtype int |
|
414 """ |
|
415 foundIdx = -1 |
|
416 insertIdx = 0 |
|
417 ioDelta = 0 |
|
418 |
|
419 for idx in range(len(self.__chunks)): |
|
420 chunk = self.__chunks[idx] |
|
421 if absPos >= chunk.absPos and \ |
|
422 absPos < (chunk.absPos + len(chunk.data)): |
|
423 foundIdx = idx |
|
424 break |
|
425 |
|
426 if absPos < chunk.absPos: |
|
427 insertIdx = idx |
|
428 break |
|
429 |
|
430 ioDelta += len(chunk.data) - self.CHUNK_SIZE |
|
431 insertIdx = idx + 1 |
|
432 |
|
433 if foundIdx == -1: |
|
434 newChunk = HexEditChunk() |
|
435 readAbsPos = absPos - ioDelta |
|
436 readPos = readAbsPos & self.READ_CHUNK_MASK |
|
437 self.__ioDevice.open(QIODevice.ReadOnly) |
|
438 self.__ioDevice.seek(readPos) |
|
439 newChunk.data = bytearray(self.__ioDevice.read(self.CHUNK_SIZE)) |
|
440 self.__ioDevice.close() |
|
441 newChunk.absPos = absPos - (readAbsPos - readPos) |
|
442 newChunk.dataChanged = bytearray(len(newChunk.data)) |
|
443 self.__chunks.insert(insertIdx, newChunk) |
|
444 foundIdx = insertIdx |
|
445 |
|
446 return foundIdx |