HexEdit/HexEditChunks.py

changeset 4650
b1ca3bcde70b
child 4666
bc52ef526e11
equal deleted inserted replaced
4649:bbc8b2de9173 4650:b1ca3bcde70b
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2016 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing the storage backend for the hex editor.
8 """
9
10 from __future__ import unicode_literals
11
12 import sys
13
14 from PyQt5.QtCore import QBuffer, QIODevice, QByteArray
15
16
17 class HexEditChunk(object):
18 """
19 Class implementing a container for the data chunks.
20 """
21 def __init__(self):
22 """
23 Constructor
24 """
25 self.data = bytearray()
26 self.dataChanged = bytearray()
27 self.absPos = 0
28
29
30 class HexEditChunks(object):
31 """
32 Class implementing the storage backend for the hex editor.
33
34 When HexEditWidget loads data, HexEditChunks access them using a QIODevice
35 interface. When the app uses a QByteArray or Python bytearray interface,
36 QBuffer is used to provide again a QIODevice like interface. No data will
37 be changed, therefore HexEditChunks opens the QIODevice in
38 QIODevice.ReadOnly mode. After every access HexEditChunks closes the
39 QIODevice. That's why external applications can overwrite files while
40 HexEditWidget shows them.
41
42 When the the user starts to edit the data, HexEditChunks creates a local
43 copy of a chunk of data (4 kilobytes) and notes all changes there. Parallel
44 to that chunk, there is a second chunk, which keeps track of which bytes
45 are changed and which are not.
46 """
47 BUFFER_SIZE = 0x10000
48 CHUNK_SIZE = 0x1000
49 READ_CHUNK_MASK = 0xfffffffffffff000
50
51 def __init__(self, ioDevice=None):
52 """
53 Constructor
54
55 @param ioDevice io device to get the data from
56 @type QIODevice
57 """
58 self.__ioDevice = None
59 self.__pos = 0
60 self.__size = 0
61 self.__chunks = []
62
63 if ioDevice is None:
64 buf = QBuffer()
65 self.setIODevice(buf)
66 else:
67 self.setIODevice(ioDevice)
68
69 def setIODevice(self, ioDevice):
70 """
71 Public method to set an io device to read the binary data from.
72
73 @param ioDevice io device to get the data from
74 @type QIODevice
75 @return flag indicating successful operation
76 @rtype bool
77 """
78 self.__ioDevice = ioDevice
79 ok = self.__ioDevice.open(QIODevice.ReadOnly)
80 if ok:
81 # open successfully
82 self.__size = self.__ioDevice.size()
83 self.__ioDevice.close()
84 else:
85 # fallback is an empty buffer
86 self.__ioDevice = QBuffer()
87 self.__size = 0
88
89 self.__chunks = []
90 self.__pos = 0
91
92 return ok
93
94 def data(self, pos=0, maxSize=-1, highlighted=None):
95 """
96 Public method to get data out of the chunks.
97
98 @param pos position to get bytes from
99 @type int
100 @param maxSize maximum amount of bytes to get
101 @type int
102 @param highlighted reference to a byte array storing highlighting info
103 @byte bytearray
104 @return retrieved data
105 @rtype bytearray
106 """
107 ioDelta = 0
108 chunkIdx = 0
109
110 chunk = HexEditChunk()
111 buffer = bytearray()
112
113 if highlighted is not None:
114 del highlighted[:]
115
116 if pos >= self.__size:
117 return buffer
118
119 if maxSize < 0:
120 maxSize = self.__size
121 elif (pos + maxSize) > self.__size:
122 maxSize = self.__size - pos
123
124 self.__ioDevice.open(QIODevice.ReadOnly)
125
126 while maxSize > 0:
127 chunk.absPos = sys.maxsize
128 chunksLoopOngoing = True
129 while chunkIdx < len(self.__chunks) and chunksLoopOngoing:
130 # In this section, we track changes before our required data
131 # and we take the edited data, if availible. ioDelta is a
132 # difference counter to justify the read pointer to the
133 # original data, if data in between was deleted or inserted.
134
135 chunk = self.__chunks[chunkIdx]
136 if chunk.absPos > pos:
137 chunksLoopOngoing = False
138 else:
139 chunkIdx += 1
140 chunkOfs = pos - chunk.absPos
141 if maxSize > (len(chunk.data) - chunkOfs):
142 count = len(chunk.data) - chunkOfs
143 ioDelta += self.CHUNK_SIZE - len(chunk.data)
144 else:
145 count = maxSize
146 if count > 0:
147 buffer += chunk.data[chunkOfs:chunkOfs + count]
148 maxSize -= count
149 pos += count
150 if highlighted is not None:
151 highlighted += \
152 chunk.dataChanged[chunkOfs:chunkOfs + count]
153
154 if maxSize > 0 and pos < chunk.absPos:
155 # In this section, we read data from the original source. This
156 # will only happen, when no copied data is available.
157 if chunk.absPos - pos > maxSize:
158 byteCount = maxSize
159 else:
160 byteCount = chunk.absPos - pos
161
162 maxSize -= byteCount
163 self.__ioDevice.seek(pos + ioDelta)
164 readBuffer = bytearray(self.__ioDevice.read(byteCount))
165 buffer += readBuffer
166 if highlighted is not None:
167 highlighted += bytearray(len(readBuffer))
168 pos += len(readBuffer)
169
170 self.__ioDevice.close()
171 return buffer
172
173 def write(self, ioDevice, pos=0, count=-1):
174 """
175 Public method to write data to an io device.
176
177 @param ioDevice io device to write the data to
178 @type QIODevice
179 @param pos position to write bytes from
180 @type int
181 @param count amount of bytes to write
182 @type int
183 @return flag indicating success
184 @rtype bool
185 """
186 if count == -1:
187 # write all data
188 count = self.__size
189
190 ok = ioDevice.open(QIODevice.WriteOnly)
191 if ok:
192 idx = pos
193 while idx < count:
194 data = self.data(idx, self.BUFFER_SIZE)
195 ioDevice.write(QByteArray(data))
196
197 # increment loop variable
198 idx += self.BUFFER_SIZE
199
200 ioDevice.close()
201
202 return ok
203
204 def setDataChanged(self, pos, dataChanged):
205 """
206 Public method to set highlighting info.
207
208 @param pos position to set highlighting info for
209 @type int
210 @param dataChanged flag indicating changed data
211 @type bool
212 """
213 if pos < 0 or pos >= self.__size:
214 # position is out of range, do nothing
215 return
216 chunkIdx = self.__getChunkIndex(pos)
217 posInChunk = pos - self.__chunks[chunkIdx].absPos
218 self.__chunks[chunkIdx].dataChanged[posInChunk] = int(dataChanged)
219
220 def dataChanged(self, pos):
221 """
222 Public method to test, if some data was changed.
223
224 @param pos byte position to check
225 @type int
226 @return flag indicating the changed state
227 @rtype bool
228 """
229 highlighted = bytearray()
230 self.data(pos, 1, highlighted)
231 return highlighted and bool(highlighted[0])
232
233 def indexOf(self, byteArray, start):
234 """
235 Public method to search the first occurrence of some data.
236
237 @param byteArray data to search for
238 @type bytearray
239 @param start position to start the search at
240 @type int
241 @return position the data was found at or -1 if nothing could be found
242 @rtype int
243 """
244 ba = bytearray(byteArray)
245
246 result = -1
247 pos = start
248 while pos < self.__size:
249 buffer = self.data(pos, self.BUFFER_SIZE + len(ba) - 1)
250 findPos = buffer.find(ba)
251 if findPos >= 0:
252 result = pos + findPos
253 break
254
255 # increment loop variable
256 pos += self.BUFFER_SIZE
257
258 return result
259
260 def lastIndexOf(self, byteArray, start):
261 """
262 Public method to search the last occurrence of some data.
263
264 @param byteArray data to search for
265 @type bytearray
266 @param start position to start the search at
267 @type int
268 @return position the data was found at or -1 if nothing could be found
269 @rtype int
270 """
271 ba = bytearray(byteArray)
272
273 result = -1
274 pos = start
275 while pos > 0 and result < 0:
276 sPos = pos - self.BUFFER_SIZE - len(ba) + 1
277 if sPos < 0:
278 sPos = 0
279
280 buffer = self.data(sPos, pos - sPos)
281 findPos = buffer.rfind(ba)
282 if findPos >= 0:
283 result = sPos + findPos
284 break
285
286 # increment loop variable
287 pos -= self.BUFFER_SIZE
288
289 return result
290
291 def insert(self, pos, data):
292 """
293 Public method to insert a byte.
294
295 @param pos position to insert at
296 @type int
297 @param data byte to insert
298 @type int (range 0 to 255)
299 @return flag indicating success
300 @rtype bool
301 """
302 if pos < 0 or pos > self.__size:
303 # position is out of range, do nothing
304 return False
305
306 if pos == self.__size:
307 chunkIdx = self.__getChunkIndex(pos - 1)
308 else:
309 chunkIdx = self.__getChunkIndex(pos)
310 chunk = self.__chunks[chunkIdx]
311 posInChunk = pos - chunk.absPos
312 chunk.data.insert(posInChunk, data)
313 chunk.dataChanged.insert(posInChunk, 1)
314 for idx in range(chunkIdx + 1, len(self.__chunks)):
315 self.__chunks[idx].absPos += 1
316 self.__size += 1
317 self.__pos = pos
318 return True
319
320 def overwrite(self, pos, data):
321 """
322 Public method to overwrite a byte.
323
324 @param pos position to overwrite
325 @type int
326 @param data byte to overwrite with
327 @type int (range 0 to 255)
328 @return flag indicating success
329 @rtype bool
330 """
331 if pos < 0 or pos >= self.__size:
332 # position is out of range, do nothing
333 return False
334
335 chunkIdx = self.__getChunkIndex(pos)
336 chunk = self.__chunks[chunkIdx]
337 posInChunk = pos - chunk.absPos
338 chunk.data[posInChunk] = data
339 chunk.dataChanged[posInChunk] = 1
340 self.__pos = pos
341 return True
342
343 def removeAt(self, pos):
344 """
345 Public method to remove a byte.
346
347 @param pos position to remove
348 @type int
349 @return flag indicating success
350 @rtype bool
351 """
352 if pos < 0 or pos >= self.__size:
353 # position is out of range, do nothing
354 return
355
356 chunkIdx = self.__getChunkIndex(pos)
357 chunk = self.__chunks[chunkIdx]
358 posInChunk = pos - chunk.absPos
359 chunk.data.pop(posInChunk)
360 chunk.dataChanged.pop(posInChunk)
361 for idx in range(chunkIdx + 1, len(self.__chunks)):
362 self.__chunks[idx].absPos -= 1
363 self.__size -= 1
364 self.__pos = pos
365 return True
366
367 def __getitem__(self, pos):
368 """
369 Special method to get a byte at a position.
370
371 Note: This realizes the [] get operator.
372
373 @param pos position of byte to get
374 @type int
375 @return requested byte
376 @rtype int (range 0 to 255)
377 """
378 if pos >= self.__size:
379 return 0
380 ## raise IndexError
381
382 return self.data(pos, 1)[0]
383
384 def pos(self):
385 """
386 Public method to get the current position.
387
388 @return current position
389 @rtype int
390 """
391 return self.__pos
392
393 def size(self):
394 """
395 Public method to get the current data size.
396
397 @return current data size
398 @rtype int
399 """
400 return self.__size
401
402 def __getChunkIndex(self, absPos):
403 """
404 Private method to get the chunk index for a position.
405
406 This method checks, if there is already a copied chunk available. If
407 there is one, it returns its index. If there is no copied chunk
408 available, original data will be copied into a new chunk.
409
410 @param absPos absolute position of the data.
411 @type int
412 @return index of the chunk containing the position
413 @rtype int
414 """
415 foundIdx = -1
416 insertIdx = 0
417 ioDelta = 0
418
419 for idx in range(len(self.__chunks)):
420 chunk = self.__chunks[idx]
421 if absPos >= chunk.absPos and \
422 absPos < (chunk.absPos + len(chunk.data)):
423 foundIdx = idx
424 break
425
426 if absPos < chunk.absPos:
427 insertIdx = idx
428 break
429
430 ioDelta += len(chunk.data) - self.CHUNK_SIZE
431 insertIdx = idx + 1
432
433 if foundIdx == -1:
434 newChunk = HexEditChunk()
435 readAbsPos = absPos - ioDelta
436 readPos = readAbsPos & self.READ_CHUNK_MASK
437 self.__ioDevice.open(QIODevice.ReadOnly)
438 self.__ioDevice.seek(readPos)
439 newChunk.data = bytearray(self.__ioDevice.read(self.CHUNK_SIZE))
440 self.__ioDevice.close()
441 newChunk.absPos = absPos - (readAbsPos - readPos)
442 newChunk.dataChanged = bytearray(len(newChunk.data))
443 self.__chunks.insert(insertIdx, newChunk)
444 foundIdx = insertIdx
445
446 return foundIdx

eric ide

mercurial