src/eric7/HexEdit/HexEditChunks.py

branch
eric7
changeset 9209
b99e7fd55fd3
parent 8881
54e42bc2437a
child 9221
bf71ee032bb4
equal deleted inserted replaced
9208:3fc8dfeb6ebe 9209:b99e7fd55fd3
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2016 - 2022 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing the storage backend for the hex editor.
8 """
9
10 import sys
11
12 from PyQt6.QtCore import QBuffer, QIODevice, QByteArray
13
14
15 class HexEditChunk:
16 """
17 Class implementing a container for the data chunks.
18 """
19 def __init__(self):
20 """
21 Constructor
22 """
23 self.data = bytearray()
24 self.dataChanged = bytearray()
25 self.absPos = 0
26
27
28 class HexEditChunks:
29 """
30 Class implementing the storage backend for the hex editor.
31
32 When HexEditWidget loads data, HexEditChunks access them using a QIODevice
33 interface. When the app uses a QByteArray or Python bytearray interface,
34 QBuffer is used to provide again a QIODevice like interface. No data will
35 be changed, therefore HexEditChunks opens the QIODevice in
36 QIODevice.OpenModeFlag.ReadOnly mode. After every access HexEditChunks
37 closes the QIODevice. That's why external applications can overwrite
38 files while HexEditWidget shows them.
39
40 When the the user starts to edit the data, HexEditChunks creates a local
41 copy of a chunk of data (4 kilobytes) and notes all changes there. Parallel
42 to that chunk, there is a second chunk, which keeps track of which bytes
43 are changed and which are not.
44 """
45 BUFFER_SIZE = 0x10000
46 CHUNK_SIZE = 0x1000
47 READ_CHUNK_MASK = 0xfffffffffffff000
48
49 def __init__(self, ioDevice=None):
50 """
51 Constructor
52
53 @param ioDevice io device to get the data from
54 @type QIODevice
55 """
56 self.__ioDevice = None
57 self.__pos = 0
58 self.__size = 0
59 self.__chunks = []
60
61 if ioDevice is None:
62 buf = QBuffer()
63 self.setIODevice(buf)
64 else:
65 self.setIODevice(ioDevice)
66
67 def setIODevice(self, ioDevice):
68 """
69 Public method to set an io device to read the binary data from.
70
71 @param ioDevice io device to get the data from
72 @type QIODevice
73 @return flag indicating successful operation
74 @rtype bool
75 """
76 self.__ioDevice = ioDevice
77 ok = self.__ioDevice.open(QIODevice.OpenModeFlag.ReadOnly)
78 if ok:
79 # open successfully
80 self.__size = self.__ioDevice.size()
81 self.__ioDevice.close()
82 else:
83 # fallback is an empty buffer
84 self.__ioDevice = QBuffer()
85 self.__size = 0
86
87 self.__chunks = []
88 self.__pos = 0
89
90 return ok
91
92 def data(self, pos=0, maxSize=-1, highlighted=None):
93 """
94 Public method to get data out of the chunks.
95
96 @param pos position to get bytes from
97 @type int
98 @param maxSize maximum amount of bytes to get
99 @type int
100 @param highlighted reference to a byte array storing highlighting info
101 @type bytearray
102 @return retrieved data
103 @rtype bytearray
104 """
105 ioDelta = 0
106 chunkIdx = 0
107
108 chunk = HexEditChunk()
109 buffer = bytearray()
110
111 if highlighted is not None:
112 del highlighted[:]
113
114 if pos >= self.__size:
115 return buffer
116
117 if maxSize < 0:
118 maxSize = self.__size
119 elif (pos + maxSize) > self.__size:
120 maxSize = self.__size - pos
121
122 self.__ioDevice.open(QIODevice.OpenModeFlag.ReadOnly)
123
124 while maxSize > 0:
125 chunk.absPos = sys.maxsize
126 chunksLoopOngoing = True
127 while chunkIdx < len(self.__chunks) and chunksLoopOngoing:
128 # In this section, we track changes before our required data
129 # and we take the edited data, if availible. ioDelta is a
130 # difference counter to justify the read pointer to the
131 # original data, if data in between was deleted or inserted.
132
133 chunk = self.__chunks[chunkIdx]
134 if chunk.absPos > pos:
135 chunksLoopOngoing = False
136 else:
137 chunkIdx += 1
138 chunkOfs = pos - chunk.absPos
139 if maxSize > (len(chunk.data) - chunkOfs):
140 count = len(chunk.data) - chunkOfs
141 ioDelta += self.CHUNK_SIZE - len(chunk.data)
142 else:
143 count = maxSize
144 if count > 0:
145 buffer += chunk.data[chunkOfs:chunkOfs + count]
146 maxSize -= count
147 pos += count
148 if highlighted is not None:
149 highlighted += chunk.dataChanged[
150 chunkOfs:chunkOfs + count]
151
152 if maxSize > 0 and pos < chunk.absPos:
153 # In this section, we read data from the original source. This
154 # will only happen, when no copied data is available.
155 if chunk.absPos - pos > maxSize:
156 byteCount = maxSize
157 else:
158 byteCount = chunk.absPos - pos
159
160 maxSize -= byteCount
161 self.__ioDevice.seek(pos + ioDelta)
162 readBuffer = bytearray(self.__ioDevice.read(byteCount))
163 buffer += readBuffer
164 if highlighted is not None:
165 highlighted += bytearray(len(readBuffer))
166 pos += len(readBuffer)
167
168 self.__ioDevice.close()
169 return buffer
170
171 def write(self, ioDevice, pos=0, count=-1):
172 """
173 Public method to write data to an io device.
174
175 @param ioDevice io device to write the data to
176 @type QIODevice
177 @param pos position to write bytes from
178 @type int
179 @param count amount of bytes to write
180 @type int
181 @return flag indicating success
182 @rtype bool
183 """
184 if count == -1:
185 # write all data
186 count = self.__size
187
188 ok = ioDevice.open(QIODevice.OpenModeFlag.WriteOnly)
189 if ok:
190 idx = pos
191 while idx < count:
192 data = self.data(idx, self.BUFFER_SIZE)
193 ioDevice.write(QByteArray(data))
194
195 # increment loop variable
196 idx += self.BUFFER_SIZE
197
198 ioDevice.close()
199
200 return ok
201
202 def setDataChanged(self, pos, dataChanged):
203 """
204 Public method to set highlighting info.
205
206 @param pos position to set highlighting info for
207 @type int
208 @param dataChanged flag indicating changed data
209 @type bool
210 """
211 if pos < 0 or pos >= self.__size:
212 # position is out of range, do nothing
213 return
214 chunkIdx = self.__getChunkIndex(pos)
215 posInChunk = pos - self.__chunks[chunkIdx].absPos
216 self.__chunks[chunkIdx].dataChanged[posInChunk] = int(dataChanged)
217
218 def dataChanged(self, pos):
219 """
220 Public method to test, if some data was changed.
221
222 @param pos byte position to check
223 @type int
224 @return flag indicating the changed state
225 @rtype bool
226 """
227 highlighted = bytearray()
228 self.data(pos, 1, highlighted)
229 return highlighted and bool(highlighted[0])
230
231 def indexOf(self, byteArray, start):
232 """
233 Public method to search the first occurrence of some data.
234
235 @param byteArray data to search for
236 @type bytearray
237 @param start position to start the search at
238 @type int
239 @return position the data was found at or -1 if nothing could be found
240 @rtype int
241 """
242 ba = bytearray(byteArray)
243
244 result = -1
245 pos = start
246 while pos < self.__size:
247 buffer = self.data(pos, self.BUFFER_SIZE + len(ba) - 1)
248 findPos = buffer.find(ba)
249 if findPos >= 0:
250 result = pos + findPos
251 break
252
253 # increment loop variable
254 pos += self.BUFFER_SIZE
255
256 return result
257
258 def lastIndexOf(self, byteArray, start):
259 """
260 Public method to search the last occurrence of some data.
261
262 @param byteArray data to search for
263 @type bytearray
264 @param start position to start the search at
265 @type int
266 @return position the data was found at or -1 if nothing could be found
267 @rtype int
268 """
269 ba = bytearray(byteArray)
270
271 result = -1
272 pos = start
273 while pos > 0 and result < 0:
274 sPos = pos - self.BUFFER_SIZE - len(ba) + 1
275 if sPos < 0:
276 sPos = 0
277
278 buffer = self.data(sPos, pos - sPos)
279 findPos = buffer.rfind(ba)
280 if findPos >= 0:
281 result = sPos + findPos
282 break
283
284 # increment loop variable
285 pos -= self.BUFFER_SIZE
286
287 return result
288
289 def insert(self, pos, data):
290 """
291 Public method to insert a byte.
292
293 @param pos position to insert at
294 @type int
295 @param data byte to insert
296 @type int (range 0 to 255)
297 @return flag indicating success
298 @rtype bool
299 """
300 if pos < 0 or pos > self.__size:
301 # position is out of range, do nothing
302 return False
303
304 chunkIdx = (
305 self.__getChunkIndex(pos - 1)
306 if pos == self.__size else
307 self.__getChunkIndex(pos)
308 )
309 chunk = self.__chunks[chunkIdx]
310 posInChunk = pos - chunk.absPos
311 chunk.data.insert(posInChunk, data)
312 chunk.dataChanged.insert(posInChunk, 1)
313 for idx in range(chunkIdx + 1, len(self.__chunks)):
314 self.__chunks[idx].absPos += 1
315 self.__size += 1
316 self.__pos = pos
317 return True
318
319 def overwrite(self, pos, data):
320 """
321 Public method to overwrite a byte.
322
323 @param pos position to overwrite
324 @type int
325 @param data byte to overwrite with
326 @type int (range 0 to 255)
327 @return flag indicating success
328 @rtype bool
329 """
330 if pos < 0 or pos >= self.__size:
331 # position is out of range, do nothing
332 return False
333
334 chunkIdx = self.__getChunkIndex(pos)
335 chunk = self.__chunks[chunkIdx]
336 posInChunk = pos - chunk.absPos
337 chunk.data[posInChunk] = data
338 chunk.dataChanged[posInChunk] = 1
339 self.__pos = pos
340 return True
341
342 def removeAt(self, pos):
343 """
344 Public method to remove a byte.
345
346 @param pos position to remove
347 @type int
348 @return flag indicating success
349 @rtype bool
350 """
351 if pos < 0 or pos >= self.__size:
352 # position is out of range, do nothing
353 return False
354
355 chunkIdx = self.__getChunkIndex(pos)
356 chunk = self.__chunks[chunkIdx]
357 posInChunk = pos - chunk.absPos
358 chunk.data.pop(posInChunk)
359 chunk.dataChanged.pop(posInChunk)
360 for idx in range(chunkIdx + 1, len(self.__chunks)):
361 self.__chunks[idx].absPos -= 1
362 self.__size -= 1
363 self.__pos = pos
364 return True
365
366 def __getitem__(self, pos):
367 """
368 Special method to get a byte at a position.
369
370 Note: This realizes the [] get operator.
371
372 @param pos position of byte to get
373 @type int
374 @return requested byte
375 @rtype int (range 0 to 255)
376 """
377 if pos >= self.__size:
378 return 0
379 ## raise IndexError
380
381 return self.data(pos, 1)[0]
382
383 def pos(self):
384 """
385 Public method to get the current position.
386
387 @return current position
388 @rtype int
389 """
390 return self.__pos
391
392 def size(self):
393 """
394 Public method to get the current data size.
395
396 @return current data size
397 @rtype int
398 """
399 return self.__size
400
401 def __getChunkIndex(self, absPos):
402 """
403 Private method to get the chunk index for a position.
404
405 This method checks, if there is already a copied chunk available. If
406 there is one, it returns its index. If there is no copied chunk
407 available, original data will be copied into a new chunk.
408
409 @param absPos absolute position of the data.
410 @type int
411 @return index of the chunk containing the position
412 @rtype int
413 """
414 foundIdx = -1
415 insertIdx = 0
416 ioDelta = 0
417
418 for idx in range(len(self.__chunks)):
419 chunk = self.__chunks[idx]
420 if (
421 absPos >= chunk.absPos and
422 absPos < (chunk.absPos + len(chunk.data))
423 ):
424 foundIdx = idx
425 break
426
427 if absPos < chunk.absPos:
428 insertIdx = idx
429 break
430
431 ioDelta += len(chunk.data) - self.CHUNK_SIZE
432 insertIdx = idx + 1
433
434 if foundIdx == -1:
435 newChunk = HexEditChunk()
436 readAbsPos = absPos - ioDelta
437 readPos = readAbsPos & self.READ_CHUNK_MASK
438 self.__ioDevice.open(QIODevice.OpenModeFlag.ReadOnly)
439 self.__ioDevice.seek(readPos)
440 newChunk.data = bytearray(self.__ioDevice.read(self.CHUNK_SIZE))
441 self.__ioDevice.close()
442 newChunk.absPos = absPos - (readAbsPos - readPos)
443 newChunk.dataChanged = bytearray(len(newChunk.data))
444 self.__chunks.insert(insertIdx, newChunk)
445 foundIdx = insertIdx
446
447 return foundIdx

eric ide

mercurial