23 <li>lls: directory listing with meta data</li> |
27 <li>lls: directory listing with meta data</li> |
24 <li>cd: change directory</li> |
28 <li>cd: change directory</li> |
25 <li>pwd: get the current directory</li> |
29 <li>pwd: get the current directory</li> |
26 <li>put: copy a file to the connected device</li> |
30 <li>put: copy a file to the connected device</li> |
27 <li>get: get a file from the connected device</li> |
31 <li>get: get a file from the connected device</li> |
|
32 <li>rm: remove a file from the connected device</li> |
|
33 <li>mkdir: create a new directory</li> |
|
34 <li>rmdir: remove an empty directory</li> |
28 </ul> |
35 </ul> |
29 """ |
36 """ |
30 def __init__(self, parent=None): |
37 def __init__(self, parent=None): |
31 """ |
38 """ |
32 Constructor |
39 Constructor |
33 |
40 |
34 @param parent reference to the parent object |
41 @param parent reference to the parent object |
35 @type QObject |
42 @type QObject |
36 """ |
43 """ |
37 super(MicroPythonFileSystem, self).__init__(parent) |
44 super(MicroPythonFileSystem, self).__init__(parent) |
38 |
45 |
39 def ls(self): |
46 self.__serial = None |
|
47 |
|
48 def setSerial(self, serial): |
|
49 """ |
|
50 Public method to set the serial port to be used. |
|
51 |
|
52 Note: The serial port should be initialized and open already. |
|
53 |
|
54 @param serial open serial port |
|
55 @type MicroPythonSerialPort |
|
56 """ |
|
57 self.__serial = serial |
|
58 |
|
59 def __rawOn(self): |
|
60 """ |
|
61 Private method to switch the connected device to 'raw' mode. |
|
62 |
|
63 Note: switching to raw mode is done with synchroneous writes. |
|
64 """ |
|
65 if not self.__serial: |
|
66 return |
|
67 |
|
68 rawReplMessage = b"raw REPL; CTRL-B to exit\r\n" |
|
69 softRebootMessage = b"soft reboot\r\n" |
|
70 |
|
71 self.__serial.write(b"\x02") # end raw mode if required |
|
72 self.__serial.waitForBytesWritten() |
|
73 for _i in range(3): |
|
74 # CTRL-C three times to break out of loops |
|
75 self.__serial.write(b"\r\x03") |
|
76 self.__serial.waitForBytesWritten() |
|
77 QThread.msleep(10) |
|
78 self.__serial.readAll() # read all data and discard it |
|
79 self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode |
|
80 self.__serial.readUntil(rawReplMessage) |
|
81 self.__serial.write(b"\x04") # send CTRL-D to soft reset |
|
82 self.__serial.readUntil(softRebootMessage) |
|
83 |
|
84 # some MicroPython devices seem to need to be convinced in some |
|
85 # special way |
|
86 data = self.__serial.readUntil(rawReplMessage) |
|
87 if not data.endswith(rawReplMessage): |
|
88 self.__serial.write(b"\r\x01") # send CTRL-A again |
|
89 self.__serial.readUntil(rawReplMessage) |
|
90 self.__serial.readAll() # read all data and discard it |
|
91 |
|
92 def __rawOff(self): |
|
93 """ |
|
94 Private method to switch 'raw' mode off. |
|
95 """ |
|
96 if self.__serial: |
|
97 self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode |
|
98 |
|
99 def __execute(self, commands): |
|
100 """ |
|
101 Private method to send commands to the connected device and return the |
|
102 result. |
|
103 |
|
104 If no serial connection is available, empty results will be returned. |
|
105 |
|
106 @param commands list of commands to be executed |
|
107 @type str |
|
108 @return tuple containing stdout and stderr output of the device |
|
109 @rtype tuple of (bytes, bytes) |
|
110 """ |
|
111 if not self.__serial: |
|
112 return b"", b"" |
|
113 |
|
114 result = bytearray() |
|
115 err = b"" |
|
116 |
|
117 self.__rawOn() |
|
118 QThread.msleep(10) |
|
119 for command in commands: |
|
120 if command: |
|
121 commandBytes = command.encode("utf-8") |
|
122 self.__serial.write(commandBytes + b"\x04") |
|
123 # read until prompt |
|
124 response = self.__serial.readUntil(b"\x04>") |
|
125 # split stdout, stderr |
|
126 out, err = response[2:-2].split(b"\x04") |
|
127 result += out |
|
128 if err: |
|
129 return b"", err |
|
130 QThread.msleep(10) |
|
131 self.__rawOff() |
|
132 |
|
133 return bytes(result), err |
|
134 |
|
135 def __shortError(self, error): |
|
136 """ |
|
137 Private method to create a shortened error message. |
|
138 |
|
139 @param error verbose error message |
|
140 @type bytes |
|
141 @return shortened error message |
|
142 @rtype str |
|
143 """ |
|
144 if error: |
|
145 decodedError = error.decode("utf-8") |
|
146 try: |
|
147 return decodedError.split["\r\n"][-2] |
|
148 except Exception: |
|
149 return decodedError |
|
150 return self.tr("Detected an error without indications.") |
|
151 |
|
152 ################################################################## |
|
153 ## Methods below implement the file system commands |
|
154 ################################################################## |
|
155 |
|
156 def ls(self, dirname=""): |
40 """ |
157 """ |
41 Public method to get a directory listing of the connected device. |
158 Public method to get a directory listing of the connected device. |
42 |
159 |
|
160 @param dirname name of the directory to be listed |
|
161 @type str |
43 @return tuple containg the directory listing |
162 @return tuple containg the directory listing |
44 @rtype tuple of str |
163 @rtype tuple of str |
45 """ |
164 @exception IOError raised to indicate an issue with the device |
46 # TODO: not implemented yet |
165 """ |
47 |
166 commands = [ |
48 def lls(self): |
167 "import os", |
|
168 "print(os.listdir('{0}'))".format(dirname), |
|
169 ] |
|
170 out, err = self.__execute(commands) |
|
171 if err: |
|
172 raise IOError(self.__shortError(err)) |
|
173 return ast.literal_eval(out.decode("utf-8")) |
|
174 |
|
175 def lls(self, dirname=""): |
49 """ |
176 """ |
50 Public method to get a long directory listing of the connected device |
177 Public method to get a long directory listing of the connected device |
51 including meta data. |
178 including meta data. |
52 |
179 |
53 @return tuple containg the the directory listing with tuple entries |
180 @param dirname name of the directory to be listed |
54 containing the name, size, time and mode |
181 @type str |
|
182 @return list containing the the directory listing with tuple entries |
|
183 of the name and and a tuple of mode, size and time |
55 @rtype tuple of str |
184 @rtype tuple of str |
56 """ |
185 @exception IOError raised to indicate an issue with the device |
57 # TODO: not implemented yet |
186 """ |
58 |
187 commands = [ |
59 def cd(self, path): |
188 "import os", |
|
189 "\n".join([ |
|
190 "def stat(filename):", |
|
191 " try:", |
|
192 " rstat = os.lstat(filename)", |
|
193 " except:", |
|
194 " rstat = os.stat(filename)", |
|
195 " return tuple(rstat)", |
|
196 ]), |
|
197 "\n".join([ |
|
198 "def listdir_stat(dirname):", |
|
199 " try:", |
|
200 " files = os.listdir(dirname)", |
|
201 " except OSError:", |
|
202 " return []", |
|
203 " if dirname in ('', '/'):", |
|
204 " return list((f, stat(f)) for f in files)", |
|
205 " return list((f, stat(dirname + '/' + f)) for f in files)", |
|
206 ]), |
|
207 "print(listdir_stat('{0}'))".format(dirname), |
|
208 ] |
|
209 out, err = self.__execute(commands) |
|
210 if err: |
|
211 raise IOError(self.__shortError(err)) |
|
212 fileslist = ast.literal_eval(out.decode("utf-8")) |
|
213 return [(f, (s[0], s[6], s[8])) for f, s in fileslist] |
|
214 |
|
215 def cd(self, dirname): |
60 """ |
216 """ |
61 Public method to change the current directory on the connected device. |
217 Public method to change the current directory on the connected device. |
62 |
218 |
63 @param path directory to change to |
219 @param dirname directory to change to |
64 @type str |
220 @type str |
65 """ |
221 @exception IOError raised to indicate an issue with the device |
66 # TODO: not implemented yet |
222 """ |
|
223 assert dirname |
|
224 |
|
225 commands = [ |
|
226 "import os", |
|
227 "os.chdir('{0}')".format(dirname), |
|
228 ] |
|
229 out, err = self.__execute(commands) |
|
230 if err: |
|
231 raise IOError(self.__shortError(err)) |
67 |
232 |
68 def pwd(self): |
233 def pwd(self): |
69 """ |
234 """ |
70 Public method to get the current directory of the connected device. |
235 Public method to get the current directory of the connected device. |
71 |
236 |
72 @return current directory |
237 @return current directory |
73 @rtype str |
238 @rtype str |
74 """ |
239 @exception IOError raised to indicate an issue with the device |
75 # TODO: not implemented yet |
240 """ |
|
241 commands = [ |
|
242 "import os", |
|
243 "print(os.getcwd())", |
|
244 ] |
|
245 out, err = self.__execute(commands) |
|
246 if err: |
|
247 raise IOError(self.__shortError(err)) |
|
248 return out.decode("utf-8").strip() |
|
249 |
|
250 def rm(self, filename): |
|
251 """ |
|
252 Public method to remove a file from the connected device. |
|
253 |
|
254 @param filename name of the file to be removed |
|
255 @type str |
|
256 @exception IOError raised to indicate an issue with the device |
|
257 """ |
|
258 assert filename |
|
259 |
|
260 commands = [ |
|
261 "import os", |
|
262 "os.remove('{0}')".format(filename), |
|
263 ] |
|
264 out, err = self.__execute(commands) |
|
265 if err: |
|
266 raise IOError(self.__shortError(err)) |
|
267 |
|
268 def mkdir(self, dirname): |
|
269 """ |
|
270 Public method to create a new directory. |
|
271 |
|
272 @param dirname name of the directory to create |
|
273 @type str |
|
274 @exception IOError raised to indicate an issue with the device |
|
275 """ |
|
276 assert dirname |
|
277 |
|
278 commands = [ |
|
279 "import os", |
|
280 "os.mkdir('{0}')".format(dirname), |
|
281 ] |
|
282 out, err = self.__execute(commands) |
|
283 if err: |
|
284 raise IOError(self.__shortError(err)) |
|
285 |
|
286 def rmdir(self, dirname): |
|
287 """ |
|
288 Public method to remove a directory. |
|
289 |
|
290 @param dirname name of the directory to be removed |
|
291 @type str |
|
292 @exception IOError raised to indicate an issue with the device |
|
293 """ |
|
294 assert dirname |
|
295 |
|
296 commands = [ |
|
297 "import os", |
|
298 "os.rmdir('{0}')".format(dirname), |
|
299 ] |
|
300 out, err = self.__execute(commands) |
|
301 if err: |
|
302 raise IOError(self.__shortError(err)) |
76 |
303 |
77 def put(self, hostFileName, deviceFileName): |
304 def put(self, hostFileName, deviceFileName): |
78 """ |
305 """ |
79 Public method to copy a local file to the connected device. |
306 Public method to copy a local file to the connected device. |
80 |
307 |
95 @type str |
323 @type str |
96 @param hostFileName name of the file to copy to |
324 @param hostFileName name of the file to copy to |
97 @type str |
325 @type str |
98 @return flag indicating success |
326 @return flag indicating success |
99 @rtype bool |
327 @rtype bool |
|
328 @exception IOError raised to indicate an issue with the device |
100 """ |
329 """ |
101 # TODO: not implemented yet |
330 # TODO: not implemented yet |
|
331 |
|
332 ################################################################## |
|
333 ## Utility methods below |
|
334 ################################################################## |
|
335 |
|
336 def mtime2string(self, mtime): |
|
337 """ |
|
338 Public method to convert a time value to a string representation. |
|
339 |
|
340 @param mtime time value |
|
341 @type int |
|
342 @return string representation of the given time |
|
343 @rtype str |
|
344 """ |
|
345 return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(mtime)) |
|
346 |
|
347 def mode2string(self, mode): |
|
348 """ |
|
349 Public method to convert a mode value to a string representation. |
|
350 |
|
351 @param mode mode value |
|
352 @type int |
|
353 @return string representation of the given mode value |
|
354 @rtype str |
|
355 """ |
|
356 return stat.filemode(mode) |
|
357 # TODO: remove this |
|
358 ## |
|
359 ##if __name__ == "__main__": |
|
360 ## from PyQt5.QtCore import QCoreApplication, QTimer |
|
361 ## from MicroPythonSerialPort import MicroPythonSerialPort |
|
362 ## |
|
363 ## app = QCoreApplication([]) |
|
364 ## |
|
365 ## serial = MicroPythonSerialPort() |
|
366 ## serial.openSerialLink("/dev/ttyUSB0") |
|
367 ## fs = MicroPythonFileSystem() |
|
368 ## fs.setSerial(serial) |
|
369 ## |
|
370 ## def tf(): |
|
371 ## fs.cd("/flash") |
|
372 ## print(fs.pwd()) |
|
373 ## fs.cd("odroid_go") |
|
374 ## print(fs.pwd()) |
|
375 ## ll = fs.lls() |
|
376 ## print(ll) |
|
377 ## for f, (m, s, t) in ll: |
|
378 ## print(fs.mode2string(m), s, fs.mtime2string(t), f) |
|
379 ## fs.cd("..") |
|
380 ## print(fs.pwd()) |
|
381 ## ll = fs.lls("odroid_go") |
|
382 ## print(ll) |
|
383 ## for f, (m, s, t) in ll: |
|
384 ## print(fs.mode2string(m), s, fs.mtime2string(t), f) |
|
385 ## |
|
386 ## QTimer.singleShot(0, tf) |
|
387 ## app.exec_() |