|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2013 - 2021 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing a SSL error handler. |
|
8 """ |
|
9 |
|
10 import contextlib |
|
11 import enum |
|
12 import platform |
|
13 |
|
14 from PyQt5.QtCore import QObject, QByteArray |
|
15 from PyQt5.QtNetwork import ( |
|
16 QSslCertificate, QSslConfiguration, QSslSocket, QSslError, QSsl |
|
17 ) |
|
18 |
|
19 from E5Gui import E5MessageBox |
|
20 |
|
21 import Preferences |
|
22 import Utilities |
|
23 import Globals |
|
24 |
|
25 |
|
26 class E5SslErrorState(enum.Enum): |
|
27 """ |
|
28 Class defining the SSL error handling states. |
|
29 """ |
|
30 NOT_IGNORED = 0 |
|
31 SYSTEM_IGNORED = 1 |
|
32 USER_IGNORED = 2 |
|
33 |
|
34 |
|
35 class E5SslErrorHandler(QObject): |
|
36 """ |
|
37 Class implementing a handler for SSL errors. |
|
38 |
|
39 It also initializes the default SSL configuration with certificates |
|
40 permanently accepted by the user already. |
|
41 """ |
|
42 def __init__(self, parent=None): |
|
43 """ |
|
44 Constructor |
|
45 |
|
46 @param parent reference to the parent object (QObject) |
|
47 """ |
|
48 super().__init__(parent) |
|
49 |
|
50 caList = self.__getSystemCaCertificates() |
|
51 if Preferences.Prefs.settings.contains("Help/CaCertificatesDict"): |
|
52 # port old entries stored under 'Help' |
|
53 certificateDict = Globals.toDict( |
|
54 Preferences.Prefs.settings.value("Help/CaCertificatesDict")) |
|
55 Preferences.Prefs.settings.setValue( |
|
56 "Ssl/CaCertificatesDict", certificateDict) |
|
57 Preferences.Prefs.settings.remove("Help/CaCertificatesDict") |
|
58 else: |
|
59 certificateDict = Globals.toDict( |
|
60 Preferences.Prefs.settings.value("Ssl/CaCertificatesDict")) |
|
61 for server in certificateDict: |
|
62 for cert in QSslCertificate.fromData(certificateDict[server]): |
|
63 if cert not in caList: |
|
64 caList.append(cert) |
|
65 sslCfg = QSslConfiguration.defaultConfiguration() |
|
66 sslCfg.setCaCertificates(caList) |
|
67 try: |
|
68 sslProtocol = QSsl.SslProtocol.TlsV1_1OrLater |
|
69 if Globals.isWindowsPlatform() and platform.win32_ver()[0] == '7': |
|
70 sslProtocol = QSsl.SslProtocol.SecureProtocols |
|
71 except AttributeError: |
|
72 sslProtocol = QSsl.SslProtocol.SecureProtocols |
|
73 sslCfg.setProtocol(sslProtocol) |
|
74 with contextlib.suppress(AttributeError): |
|
75 sslCfg.setSslOption(QSsl.SslOption.SslOptionDisableCompression, |
|
76 True) |
|
77 QSslConfiguration.setDefaultConfiguration(sslCfg) |
|
78 |
|
79 def sslErrorsReplySlot(self, reply, errors): |
|
80 """ |
|
81 Public slot to handle SSL errors for a network reply. |
|
82 |
|
83 @param reply reference to the reply object (QNetworkReply) |
|
84 @param errors list of SSL errors (list of QSslError) |
|
85 """ |
|
86 self.sslErrorsReply(reply, errors) |
|
87 |
|
88 def sslErrorsReply(self, reply, errors): |
|
89 """ |
|
90 Public slot to handle SSL errors for a network reply. |
|
91 |
|
92 @param reply reference to the reply object (QNetworkReply) |
|
93 @param errors list of SSL errors (list of QSslError) |
|
94 @return tuple indicating to ignore the SSL errors (one of NotIgnored, |
|
95 SystemIgnored or UserIgnored) and indicating a change of the |
|
96 default SSL configuration (boolean) |
|
97 """ |
|
98 url = reply.url() |
|
99 ignore, defaultChanged = self.sslErrors(errors, url.host(), url.port()) |
|
100 if ignore: |
|
101 if defaultChanged: |
|
102 reply.setSslConfiguration( |
|
103 QSslConfiguration.defaultConfiguration()) |
|
104 reply.ignoreSslErrors() |
|
105 else: |
|
106 reply.abort() |
|
107 |
|
108 return ignore, defaultChanged |
|
109 |
|
110 def sslErrors(self, errors, server, port=-1): |
|
111 """ |
|
112 Public method to handle SSL errors. |
|
113 |
|
114 @param errors list of SSL errors |
|
115 @type list of QSslError |
|
116 @param server name of the server |
|
117 @type str |
|
118 @param port value of the port |
|
119 @type int |
|
120 @return tuple indicating to ignore the SSL errors and indicating a |
|
121 change of the default SSL configuration |
|
122 @rtype tuple of (E5SslErrorState, bool) |
|
123 """ |
|
124 caMerge = {} |
|
125 certificateDict = Globals.toDict( |
|
126 Preferences.Prefs.settings.value("Ssl/CaCertificatesDict")) |
|
127 for caServer in certificateDict: |
|
128 caMerge[caServer] = QSslCertificate.fromData( |
|
129 certificateDict[caServer]) |
|
130 caNew = [] |
|
131 |
|
132 errorStrings = [] |
|
133 if port != -1: |
|
134 server += ":{0:d}".format(port) |
|
135 if errors: |
|
136 for err in errors: |
|
137 if err.error() == QSslError.SslError.NoError: |
|
138 continue |
|
139 if server in caMerge and err.certificate() in caMerge[server]: |
|
140 continue |
|
141 errorStrings.append(err.errorString()) |
|
142 if not err.certificate().isNull(): |
|
143 cert = err.certificate() |
|
144 if cert not in caNew: |
|
145 caNew.append(cert) |
|
146 if not errorStrings: |
|
147 return E5SslErrorState.SYSTEM_IGNORED, False |
|
148 |
|
149 errorString = '.</li><li>'.join(errorStrings) |
|
150 ret = E5MessageBox.yesNo( |
|
151 None, |
|
152 self.tr("SSL Errors"), |
|
153 self.tr("""<p>SSL Errors for <br /><b>{0}</b>""" |
|
154 """<ul><li>{1}</li></ul></p>""" |
|
155 """<p>Do you want to ignore these errors?</p>""") |
|
156 .format(server, errorString), |
|
157 icon=E5MessageBox.Warning) |
|
158 |
|
159 if ret: |
|
160 caRet = False |
|
161 if len(caNew) > 0: |
|
162 certinfos = [] |
|
163 for cert in caNew: |
|
164 certinfos.append(self.__certToString(cert)) |
|
165 caRet = E5MessageBox.yesNo( |
|
166 None, |
|
167 self.tr("Certificates"), |
|
168 self.tr( |
|
169 """<p>Certificates:<br/>{0}<br/>""" |
|
170 """Do you want to accept all these certificates?""" |
|
171 """</p>""") |
|
172 .format("".join(certinfos))) |
|
173 if caRet: |
|
174 if server not in caMerge: |
|
175 caMerge[server] = [] |
|
176 for cert in caNew: |
|
177 caMerge[server].append(cert) |
|
178 |
|
179 sslCfg = QSslConfiguration.defaultConfiguration() |
|
180 caList = sslCfg.caCertificates() |
|
181 for cert in caNew: |
|
182 caList.append(cert) |
|
183 sslCfg.setCaCertificates(caList) |
|
184 try: |
|
185 sslCfg.setProtocol(QSsl.SslProtocol.TlsV1_1OrLater) |
|
186 except AttributeError: |
|
187 sslCfg.setProtocol(QSsl.SslProtocol.SecureProtocols) |
|
188 with contextlib.suppress(AttributeError): |
|
189 sslCfg.setSslOption( |
|
190 QSsl.SslOption.SslOptionDisableCompression, |
|
191 True) |
|
192 QSslConfiguration.setDefaultConfiguration(sslCfg) |
|
193 |
|
194 certificateDict = {} |
|
195 for server in caMerge: |
|
196 pems = QByteArray() |
|
197 for cert in caMerge[server]: |
|
198 pems.append(cert.toPem() + b'\n') |
|
199 certificateDict[server] = pems |
|
200 Preferences.Prefs.settings.setValue( |
|
201 "Ssl/CaCertificatesDict", |
|
202 certificateDict) |
|
203 |
|
204 return E5SslErrorState.USER_IGNORED, caRet |
|
205 |
|
206 else: |
|
207 return E5SslErrorState.NOT_IGNORED, False |
|
208 |
|
209 def __certToString(self, cert): |
|
210 """ |
|
211 Private method to convert a certificate to a formatted string. |
|
212 |
|
213 @param cert certificate to convert (QSslCertificate) |
|
214 @return formatted string (string) |
|
215 """ |
|
216 result = "<p>" |
|
217 |
|
218 result += self.tr( |
|
219 "Name: {0}" |
|
220 ).format( |
|
221 Utilities.html_encode( |
|
222 Utilities.decodeString( |
|
223 ", ".join(cert.subjectInfo( |
|
224 QSslCertificate.SubjectInfo.CommonName)) |
|
225 ) |
|
226 ) |
|
227 ) |
|
228 |
|
229 result += self.tr( |
|
230 "<br/>Organization: {0}" |
|
231 ).format( |
|
232 Utilities.html_encode( |
|
233 Utilities.decodeString( |
|
234 ", ".join(cert.subjectInfo( |
|
235 QSslCertificate.SubjectInfo.Organization)) |
|
236 ) |
|
237 ) |
|
238 ) |
|
239 |
|
240 result += self.tr( |
|
241 "<br/>Issuer: {0}" |
|
242 ).format( |
|
243 Utilities.html_encode( |
|
244 Utilities.decodeString( |
|
245 ", ".join(cert.issuerInfo( |
|
246 QSslCertificate.SubjectInfo.CommonName)) |
|
247 ) |
|
248 ) |
|
249 ) |
|
250 result += self.tr( |
|
251 "<br/>Not valid before: {0}<br/>Valid Until: {1}" |
|
252 ).format( |
|
253 Utilities.html_encode( |
|
254 cert.effectiveDate().toString("yyyy-MM-dd") |
|
255 ), |
|
256 Utilities.html_encode( |
|
257 cert.expiryDate().toString("yyyy-MM-dd") |
|
258 ) |
|
259 ) |
|
260 |
|
261 result += "</p>" |
|
262 |
|
263 return result |
|
264 |
|
265 def __getSystemCaCertificates(self): |
|
266 """ |
|
267 Private method to get the list of system certificates. |
|
268 |
|
269 @return list of system certificates (list of QSslCertificate) |
|
270 """ |
|
271 caList = QSslCertificate.fromData(Globals.toByteArray( |
|
272 Preferences.Prefs.settings.value("Ssl/SystemCertificates"))) |
|
273 if not caList: |
|
274 caList = QSslSocket.systemCaCertificates() |
|
275 return caList |