eric6/MicroPython/MicroPythonFileSystem.py

branch
micropython
changeset 7070
3368ce0e7879
parent 7067
3fc4082fc6ba
child 7077
3b7475b7a1ef
equal deleted inserted replaced
7069:a09a30251d4e 7070:3368ce0e7879
7 Module implementing some file system commands for MicroPython. 7 Module implementing some file system commands for MicroPython.
8 """ 8 """
9 9
10 from __future__ import unicode_literals 10 from __future__ import unicode_literals
11 11
12 from PyQt5.QtCore import QObject 12 import ast
13 import time
14 import stat
15
16 from PyQt5.QtCore import QObject, QThread
13 17
14 18
15 class MicroPythonFileSystem(QObject): 19 class MicroPythonFileSystem(QObject):
16 """ 20 """
17 Class implementing some file system commands for MicroPython. 21 Class implementing some file system commands for MicroPython.
23 <li>lls: directory listing with meta data</li> 27 <li>lls: directory listing with meta data</li>
24 <li>cd: change directory</li> 28 <li>cd: change directory</li>
25 <li>pwd: get the current directory</li> 29 <li>pwd: get the current directory</li>
26 <li>put: copy a file to the connected device</li> 30 <li>put: copy a file to the connected device</li>
27 <li>get: get a file from the connected device</li> 31 <li>get: get a file from the connected device</li>
32 <li>rm: remove a file from the connected device</li>
33 <li>mkdir: create a new directory</li>
34 <li>rmdir: remove an empty directory</li>
28 </ul> 35 </ul>
29 """ 36 """
30 def __init__(self, parent=None): 37 def __init__(self, parent=None):
31 """ 38 """
32 Constructor 39 Constructor
33 40
34 @param parent reference to the parent object 41 @param parent reference to the parent object
35 @type QObject 42 @type QObject
36 """ 43 """
37 super(MicroPythonFileSystem, self).__init__(parent) 44 super(MicroPythonFileSystem, self).__init__(parent)
38 45
39 def ls(self): 46 self.__serial = None
47
48 def setSerial(self, serial):
49 """
50 Public method to set the serial port to be used.
51
52 Note: The serial port should be initialized and open already.
53
54 @param serial open serial port
55 @type MicroPythonSerialPort
56 """
57 self.__serial = serial
58
59 def __rawOn(self):
60 """
61 Private method to switch the connected device to 'raw' mode.
62
63 Note: switching to raw mode is done with synchroneous writes.
64 """
65 if not self.__serial:
66 return
67
68 rawReplMessage = b"raw REPL; CTRL-B to exit\r\n"
69 softRebootMessage = b"soft reboot\r\n"
70
71 self.__serial.write(b"\x02") # end raw mode if required
72 self.__serial.waitForBytesWritten()
73 for _i in range(3):
74 # CTRL-C three times to break out of loops
75 self.__serial.write(b"\r\x03")
76 self.__serial.waitForBytesWritten()
77 QThread.msleep(10)
78 self.__serial.readAll() # read all data and discard it
79 self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode
80 self.__serial.readUntil(rawReplMessage)
81 self.__serial.write(b"\x04") # send CTRL-D to soft reset
82 self.__serial.readUntil(softRebootMessage)
83
84 # some MicroPython devices seem to need to be convinced in some
85 # special way
86 data = self.__serial.readUntil(rawReplMessage)
87 if not data.endswith(rawReplMessage):
88 self.__serial.write(b"\r\x01") # send CTRL-A again
89 self.__serial.readUntil(rawReplMessage)
90 self.__serial.readAll() # read all data and discard it
91
92 def __rawOff(self):
93 """
94 Private method to switch 'raw' mode off.
95 """
96 if self.__serial:
97 self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode
98
99 def __execute(self, commands):
100 """
101 Private method to send commands to the connected device and return the
102 result.
103
104 If no serial connection is available, empty results will be returned.
105
106 @param commands list of commands to be executed
107 @type str
108 @return tuple containing stdout and stderr output of the device
109 @rtype tuple of (bytes, bytes)
110 """
111 if not self.__serial:
112 return b"", b""
113
114 result = bytearray()
115 err = b""
116
117 self.__rawOn()
118 QThread.msleep(10)
119 for command in commands:
120 if command:
121 commandBytes = command.encode("utf-8")
122 self.__serial.write(commandBytes + b"\x04")
123 # read until prompt
124 response = self.__serial.readUntil(b"\x04>")
125 # split stdout, stderr
126 out, err = response[2:-2].split(b"\x04")
127 result += out
128 if err:
129 return b"", err
130 QThread.msleep(10)
131 self.__rawOff()
132
133 return bytes(result), err
134
135 def __shortError(self, error):
136 """
137 Private method to create a shortened error message.
138
139 @param error verbose error message
140 @type bytes
141 @return shortened error message
142 @rtype str
143 """
144 if error:
145 decodedError = error.decode("utf-8")
146 try:
147 return decodedError.split["\r\n"][-2]
148 except Exception:
149 return decodedError
150 return self.tr("Detected an error without indications.")
151
152 ##################################################################
153 ## Methods below implement the file system commands
154 ##################################################################
155
156 def ls(self, dirname=""):
40 """ 157 """
41 Public method to get a directory listing of the connected device. 158 Public method to get a directory listing of the connected device.
42 159
160 @param dirname name of the directory to be listed
161 @type str
43 @return tuple containg the directory listing 162 @return tuple containg the directory listing
44 @rtype tuple of str 163 @rtype tuple of str
45 """ 164 @exception IOError raised to indicate an issue with the device
46 # TODO: not implemented yet 165 """
47 166 commands = [
48 def lls(self): 167 "import os",
168 "print(os.listdir('{0}'))".format(dirname),
169 ]
170 out, err = self.__execute(commands)
171 if err:
172 raise IOError(self.__shortError(err))
173 return ast.literal_eval(out.decode("utf-8"))
174
175 def lls(self, dirname=""):
49 """ 176 """
50 Public method to get a long directory listing of the connected device 177 Public method to get a long directory listing of the connected device
51 including meta data. 178 including meta data.
52 179
53 @return tuple containg the the directory listing with tuple entries 180 @param dirname name of the directory to be listed
54 containing the name, size, time and mode 181 @type str
182 @return list containing the the directory listing with tuple entries
183 of the name and and a tuple of mode, size and time
55 @rtype tuple of str 184 @rtype tuple of str
56 """ 185 @exception IOError raised to indicate an issue with the device
57 # TODO: not implemented yet 186 """
58 187 commands = [
59 def cd(self, path): 188 "import os",
189 "\n".join([
190 "def stat(filename):",
191 " try:",
192 " rstat = os.lstat(filename)",
193 " except:",
194 " rstat = os.stat(filename)",
195 " return tuple(rstat)",
196 ]),
197 "\n".join([
198 "def listdir_stat(dirname):",
199 " try:",
200 " files = os.listdir(dirname)",
201 " except OSError:",
202 " return []",
203 " if dirname in ('', '/'):",
204 " return list((f, stat(f)) for f in files)",
205 " return list((f, stat(dirname + '/' + f)) for f in files)",
206 ]),
207 "print(listdir_stat('{0}'))".format(dirname),
208 ]
209 out, err = self.__execute(commands)
210 if err:
211 raise IOError(self.__shortError(err))
212 fileslist = ast.literal_eval(out.decode("utf-8"))
213 return [(f, (s[0], s[6], s[8])) for f, s in fileslist]
214
215 def cd(self, dirname):
60 """ 216 """
61 Public method to change the current directory on the connected device. 217 Public method to change the current directory on the connected device.
62 218
63 @param path directory to change to 219 @param dirname directory to change to
64 @type str 220 @type str
65 """ 221 @exception IOError raised to indicate an issue with the device
66 # TODO: not implemented yet 222 """
223 assert dirname
224
225 commands = [
226 "import os",
227 "os.chdir('{0}')".format(dirname),
228 ]
229 out, err = self.__execute(commands)
230 if err:
231 raise IOError(self.__shortError(err))
67 232
68 def pwd(self): 233 def pwd(self):
69 """ 234 """
70 Public method to get the current directory of the connected device. 235 Public method to get the current directory of the connected device.
71 236
72 @return current directory 237 @return current directory
73 @rtype str 238 @rtype str
74 """ 239 @exception IOError raised to indicate an issue with the device
75 # TODO: not implemented yet 240 """
241 commands = [
242 "import os",
243 "print(os.getcwd())",
244 ]
245 out, err = self.__execute(commands)
246 if err:
247 raise IOError(self.__shortError(err))
248 return out.decode("utf-8").strip()
249
250 def rm(self, filename):
251 """
252 Public method to remove a file from the connected device.
253
254 @param filename name of the file to be removed
255 @type str
256 @exception IOError raised to indicate an issue with the device
257 """
258 assert filename
259
260 commands = [
261 "import os",
262 "os.remove('{0}')".format(filename),
263 ]
264 out, err = self.__execute(commands)
265 if err:
266 raise IOError(self.__shortError(err))
267
268 def mkdir(self, dirname):
269 """
270 Public method to create a new directory.
271
272 @param dirname name of the directory to create
273 @type str
274 @exception IOError raised to indicate an issue with the device
275 """
276 assert dirname
277
278 commands = [
279 "import os",
280 "os.mkdir('{0}')".format(dirname),
281 ]
282 out, err = self.__execute(commands)
283 if err:
284 raise IOError(self.__shortError(err))
285
286 def rmdir(self, dirname):
287 """
288 Public method to remove a directory.
289
290 @param dirname name of the directory to be removed
291 @type str
292 @exception IOError raised to indicate an issue with the device
293 """
294 assert dirname
295
296 commands = [
297 "import os",
298 "os.rmdir('{0}')".format(dirname),
299 ]
300 out, err = self.__execute(commands)
301 if err:
302 raise IOError(self.__shortError(err))
76 303
77 def put(self, hostFileName, deviceFileName): 304 def put(self, hostFileName, deviceFileName):
78 """ 305 """
79 Public method to copy a local file to the connected device. 306 Public method to copy a local file to the connected device.
80 307
82 @type str 309 @type str
83 @param deviceFileName name of the file to copy to 310 @param deviceFileName name of the file to copy to
84 @type str 311 @type str
85 @return flag indicating success 312 @return flag indicating success
86 @rtype bool 313 @rtype bool
314 @exception IOError raised to indicate an issue with the device
87 """ 315 """
88 # TODO: not implemented yet 316 # TODO: not implemented yet
89 317
90 def get(self, deviceFileName, hostFileName): 318 def get(self, deviceFileName, hostFileName):
91 """ 319 """
95 @type str 323 @type str
96 @param hostFileName name of the file to copy to 324 @param hostFileName name of the file to copy to
97 @type str 325 @type str
98 @return flag indicating success 326 @return flag indicating success
99 @rtype bool 327 @rtype bool
328 @exception IOError raised to indicate an issue with the device
100 """ 329 """
101 # TODO: not implemented yet 330 # TODO: not implemented yet
331
332 ##################################################################
333 ## Utility methods below
334 ##################################################################
335
336 def mtime2string(self, mtime):
337 """
338 Public method to convert a time value to a string representation.
339
340 @param mtime time value
341 @type int
342 @return string representation of the given time
343 @rtype str
344 """
345 return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(mtime))
346
347 def mode2string(self, mode):
348 """
349 Public method to convert a mode value to a string representation.
350
351 @param mode mode value
352 @type int
353 @return string representation of the given mode value
354 @rtype str
355 """
356 return stat.filemode(mode)
357 # TODO: remove this
358 ##
359 ##if __name__ == "__main__":
360 ## from PyQt5.QtCore import QCoreApplication, QTimer
361 ## from MicroPythonSerialPort import MicroPythonSerialPort
362 ##
363 ## app = QCoreApplication([])
364 ##
365 ## serial = MicroPythonSerialPort()
366 ## serial.openSerialLink("/dev/ttyUSB0")
367 ## fs = MicroPythonFileSystem()
368 ## fs.setSerial(serial)
369 ##
370 ## def tf():
371 ## fs.cd("/flash")
372 ## print(fs.pwd())
373 ## fs.cd("odroid_go")
374 ## print(fs.pwd())
375 ## ll = fs.lls()
376 ## print(ll)
377 ## for f, (m, s, t) in ll:
378 ## print(fs.mode2string(m), s, fs.mtime2string(t), f)
379 ## fs.cd("..")
380 ## print(fs.pwd())
381 ## ll = fs.lls("odroid_go")
382 ## print(ll)
383 ## for f, (m, s, t) in ll:
384 ## print(fs.mode2string(m), s, fs.mtime2string(t), f)
385 ##
386 ## QTimer.singleShot(0, tf)
387 ## app.exec_()

eric ide

mercurial