eric7/HexEdit/HexEditChunks.py

branch
eric7
changeset 8312
800c432b34c8
parent 8259
2bbec88047dd
child 8318
962bce857696
diff -r 4e8b98454baa -r 800c432b34c8 eric7/HexEdit/HexEditChunks.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/eric7/HexEdit/HexEditChunks.py	Sat May 15 18:45:04 2021 +0200
@@ -0,0 +1,447 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2016 - 2021 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module implementing the storage backend for the hex editor.
+"""
+
+import sys
+
+from PyQt5.QtCore import QBuffer, QIODevice, QByteArray
+
+
+class HexEditChunk:
+    """
+    Class implementing a container for the data chunks.
+    """
+    def __init__(self):
+        """
+        Constructor
+        """
+        self.data = bytearray()
+        self.dataChanged = bytearray()
+        self.absPos = 0
+
+
+class HexEditChunks:
+    """
+    Class implementing the storage backend for the hex editor.
+    
+    When HexEditWidget loads data, HexEditChunks access them using a QIODevice
+    interface. When the app uses a QByteArray or Python bytearray interface,
+    QBuffer is used to provide again a QIODevice like interface. No data will
+    be changed, therefore HexEditChunks opens the QIODevice in
+    QIODevice.OpenModeFlag.ReadOnly mode. After every access HexEditChunks
+    closes the QIODevice. That's why external applications can overwrite
+    files while HexEditWidget shows them.
+
+    When the the user starts to edit the data, HexEditChunks creates a local
+    copy of a chunk of data (4 kilobytes) and notes all changes there. Parallel
+    to that chunk, there is a second chunk, which keeps track of which bytes
+    are changed and which are not.
+    """
+    BUFFER_SIZE = 0x10000
+    CHUNK_SIZE = 0x1000
+    READ_CHUNK_MASK = 0xfffffffffffff000
+    
+    def __init__(self, ioDevice=None):
+        """
+        Constructor
+        
+        @param ioDevice io device to get the data from
+        @type QIODevice
+        """
+        self.__ioDevice = None
+        self.__pos = 0
+        self.__size = 0
+        self.__chunks = []
+        
+        if ioDevice is None:
+            buf = QBuffer()
+            self.setIODevice(buf)
+        else:
+            self.setIODevice(ioDevice)
+    
+    def setIODevice(self, ioDevice):
+        """
+        Public method to set an io device to read the binary data from.
+        
+        @param ioDevice io device to get the data from
+        @type QIODevice
+        @return flag indicating successful operation
+        @rtype bool
+        """
+        self.__ioDevice = ioDevice
+        ok = self.__ioDevice.open(QIODevice.OpenModeFlag.ReadOnly)
+        if ok:
+            # open successfully
+            self.__size = self.__ioDevice.size()
+            self.__ioDevice.close()
+        else:
+            # fallback is an empty buffer
+            self.__ioDevice = QBuffer()
+            self.__size = 0
+        
+        self.__chunks = []
+        self.__pos = 0
+        
+        return ok
+    
+    def data(self, pos=0, maxSize=-1, highlighted=None):
+        """
+        Public method to get data out of the chunks.
+        
+        @param pos position to get bytes from
+        @type int
+        @param maxSize maximum amount of bytes to get
+        @type int
+        @param highlighted reference to a byte array storing highlighting info
+        @type bytearray
+        @return retrieved data
+        @rtype bytearray
+        """
+        ioDelta = 0
+        chunkIdx = 0
+        
+        chunk = HexEditChunk()
+        buffer = bytearray()
+        
+        if highlighted is not None:
+            del highlighted[:]
+        
+        if pos >= self.__size:
+            return buffer
+        
+        if maxSize < 0:
+            maxSize = self.__size
+        elif (pos + maxSize) > self.__size:
+            maxSize = self.__size - pos
+        
+        self.__ioDevice.open(QIODevice.OpenModeFlag.ReadOnly)
+        
+        while maxSize > 0:
+            chunk.absPos = sys.maxsize
+            chunksLoopOngoing = True
+            while chunkIdx < len(self.__chunks) and chunksLoopOngoing:
+                # In this section, we track changes before our required data
+                # and we take the edited data, if availible. ioDelta is a
+                # difference counter to justify the read pointer to the
+                # original data, if data in between was deleted or inserted.
+                
+                chunk = self.__chunks[chunkIdx]
+                if chunk.absPos > pos:
+                    chunksLoopOngoing = False
+                else:
+                    chunkIdx += 1
+                    chunkOfs = pos - chunk.absPos
+                    if maxSize > (len(chunk.data) - chunkOfs):
+                        count = len(chunk.data) - chunkOfs
+                        ioDelta += self.CHUNK_SIZE - len(chunk.data)
+                    else:
+                        count = maxSize
+                    if count > 0:
+                        buffer += chunk.data[chunkOfs:chunkOfs + count]
+                        maxSize -= count
+                        pos += count
+                        if highlighted is not None:
+                            highlighted += chunk.dataChanged[
+                                chunkOfs:chunkOfs + count]
+            
+            if maxSize > 0 and pos < chunk.absPos:
+                # In this section, we read data from the original source. This
+                # will only happen, when no copied data is available.
+                if chunk.absPos - pos > maxSize:
+                    byteCount = maxSize
+                else:
+                    byteCount = chunk.absPos - pos
+                
+                maxSize -= byteCount
+                self.__ioDevice.seek(pos + ioDelta)
+                readBuffer = bytearray(self.__ioDevice.read(byteCount))
+                buffer += readBuffer
+                if highlighted is not None:
+                    highlighted += bytearray(len(readBuffer))
+                pos += len(readBuffer)
+        
+        self.__ioDevice.close()
+        return buffer
+    
+    def write(self, ioDevice, pos=0, count=-1):
+        """
+        Public method to write data to an io device.
+        
+        @param ioDevice io device to write the data to
+        @type QIODevice
+        @param pos position to write bytes from
+        @type int
+        @param count amount of bytes to write
+        @type int
+        @return flag indicating success
+        @rtype bool
+        """
+        if count == -1:
+            # write all data
+            count = self.__size
+        
+        ok = ioDevice.open(QIODevice.OpenModeFlag.WriteOnly)
+        if ok:
+            idx = pos
+            while idx < count:
+                data = self.data(idx, self.BUFFER_SIZE)
+                ioDevice.write(QByteArray(data))
+                
+                # increment loop variable
+                idx += self.BUFFER_SIZE
+            
+            ioDevice.close()
+        
+        return ok
+    
+    def setDataChanged(self, pos, dataChanged):
+        """
+        Public method to set highlighting info.
+        
+        @param pos position to set highlighting info for
+        @type int
+        @param dataChanged flag indicating changed data
+        @type bool
+        """
+        if pos < 0 or pos >= self.__size:
+            # position is out of range, do nothing
+            return
+        chunkIdx = self.__getChunkIndex(pos)
+        posInChunk = pos - self.__chunks[chunkIdx].absPos
+        self.__chunks[chunkIdx].dataChanged[posInChunk] = int(dataChanged)
+    
+    def dataChanged(self, pos):
+        """
+        Public method to test, if some data was changed.
+        
+        @param pos byte position to check
+        @type int
+        @return flag indicating the changed state
+        @rtype bool
+        """
+        highlighted = bytearray()
+        self.data(pos, 1, highlighted)
+        return highlighted and bool(highlighted[0])
+    
+    def indexOf(self, byteArray, start):
+        """
+        Public method to search the first occurrence of some data.
+        
+        @param byteArray data to search for
+        @type bytearray
+        @param start position to start the search at
+        @type int
+        @return position the data was found at or -1 if nothing could be found
+        @rtype int
+        """
+        ba = bytearray(byteArray)
+        
+        result = -1
+        pos = start
+        while pos < self.__size:
+            buffer = self.data(pos, self.BUFFER_SIZE + len(ba) - 1)
+            findPos = buffer.find(ba)
+            if findPos >= 0:
+                result = pos + findPos
+                break
+            
+            # increment loop variable
+            pos += self.BUFFER_SIZE
+        
+        return result
+    
+    def lastIndexOf(self, byteArray, start):
+        """
+        Public method to search the last occurrence of some data.
+        
+        @param byteArray data to search for
+        @type bytearray
+        @param start position to start the search at
+        @type int
+        @return position the data was found at or -1 if nothing could be found
+        @rtype int
+        """
+        ba = bytearray(byteArray)
+        
+        result = -1
+        pos = start
+        while pos > 0 and result < 0:
+            sPos = pos - self.BUFFER_SIZE - len(ba) + 1
+            if sPos < 0:
+                sPos = 0
+            
+            buffer = self.data(sPos, pos - sPos)
+            findPos = buffer.rfind(ba)
+            if findPos >= 0:
+                result = sPos + findPos
+                break
+            
+            # increment loop variable
+            pos -= self.BUFFER_SIZE
+        
+        return result
+    
+    def insert(self, pos, data):
+        """
+        Public method to insert a byte.
+        
+        @param pos position to insert at
+        @type int
+        @param data byte to insert
+        @type int (range 0 to 255)
+        @return flag indicating success
+        @rtype bool
+        """
+        if pos < 0 or pos > self.__size:
+            # position is out of range, do nothing
+            return False
+        
+        chunkIdx = (
+            self.__getChunkIndex(pos - 1)
+            if pos == self.__size else
+            self.__getChunkIndex(pos)
+        )
+        chunk = self.__chunks[chunkIdx]
+        posInChunk = pos - chunk.absPos
+        chunk.data.insert(posInChunk, data)
+        chunk.dataChanged.insert(posInChunk, 1)
+        for idx in range(chunkIdx + 1, len(self.__chunks)):
+            self.__chunks[idx].absPos += 1
+        self.__size += 1
+        self.__pos = pos
+        return True
+    
+    def overwrite(self, pos, data):
+        """
+        Public method to overwrite a byte.
+        
+        @param pos position to overwrite
+        @type int
+        @param data byte to overwrite with
+        @type int (range 0 to 255)
+        @return flag indicating success
+        @rtype bool
+        """
+        if pos < 0 or pos >= self.__size:
+            # position is out of range, do nothing
+            return False
+        
+        chunkIdx = self.__getChunkIndex(pos)
+        chunk = self.__chunks[chunkIdx]
+        posInChunk = pos - chunk.absPos
+        chunk.data[posInChunk] = data
+        chunk.dataChanged[posInChunk] = 1
+        self.__pos = pos
+        return True
+    
+    def removeAt(self, pos):
+        """
+        Public method to remove a byte.
+        
+        @param pos position to remove
+        @type int
+        @return flag indicating success
+        @rtype bool
+        """
+        if pos < 0 or pos >= self.__size:
+            # position is out of range, do nothing
+            return False
+        
+        chunkIdx = self.__getChunkIndex(pos)
+        chunk = self.__chunks[chunkIdx]
+        posInChunk = pos - chunk.absPos
+        chunk.data.pop(posInChunk)
+        chunk.dataChanged.pop(posInChunk)
+        for idx in range(chunkIdx + 1, len(self.__chunks)):
+            self.__chunks[idx].absPos -= 1
+        self.__size -= 1
+        self.__pos = pos
+        return True
+    
+    def __getitem__(self, pos):
+        """
+        Special method to get a byte at a position.
+        
+        Note: This realizes the [] get operator.
+        
+        @param pos position of byte to get
+        @type int
+        @return requested byte
+        @rtype int (range 0 to 255)
+        """
+        if pos >= self.__size:
+            return 0
+##            raise IndexError
+        
+        return self.data(pos, 1)[0]
+    
+    def pos(self):
+        """
+        Public method to get the current position.
+        
+        @return current position
+        @rtype int
+        """
+        return self.__pos
+    
+    def size(self):
+        """
+        Public method to get the current data size.
+        
+        @return current data size
+        @rtype int
+        """
+        return self.__size
+    
+    def __getChunkIndex(self, absPos):
+        """
+        Private method to get the chunk index for a position.
+        
+        This method checks, if there is already a copied chunk available. If
+        there is one, it returns its index. If there is no copied chunk
+        available, original data will be copied into a new chunk.
+        
+        @param absPos absolute position of the data.
+        @type int
+        @return index of the chunk containing the position
+        @rtype int
+        """
+        foundIdx = -1
+        insertIdx = 0
+        ioDelta = 0
+        
+        for idx in range(len(self.__chunks)):
+            chunk = self.__chunks[idx]
+            if (
+                absPos >= chunk.absPos and
+                absPos < (chunk.absPos + len(chunk.data))
+            ):
+                foundIdx = idx
+                break
+            
+            if absPos < chunk.absPos:
+                insertIdx = idx
+                break
+            
+            ioDelta += len(chunk.data) - self.CHUNK_SIZE
+            insertIdx = idx + 1
+        
+        if foundIdx == -1:
+            newChunk = HexEditChunk()
+            readAbsPos = absPos - ioDelta
+            readPos = readAbsPos & self.READ_CHUNK_MASK
+            self.__ioDevice.open(QIODevice.OpenModeFlag.ReadOnly)
+            self.__ioDevice.seek(readPos)
+            newChunk.data = bytearray(self.__ioDevice.read(self.CHUNK_SIZE))
+            self.__ioDevice.close()
+            newChunk.absPos = absPos - (readAbsPos - readPos)
+            newChunk.dataChanged = bytearray(len(newChunk.data))
+            self.__chunks.insert(insertIdx, newChunk)
+            foundIdx = insertIdx
+        
+        return foundIdx

eric ide

mercurial