|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2013 - 2019 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing a SSL error handler. |
|
8 """ |
|
9 |
|
10 from __future__ import unicode_literals |
|
11 |
|
12 import platform |
|
13 |
|
14 from PyQt5.QtCore import QObject, QByteArray |
|
15 from PyQt5.QtNetwork import QSslCertificate, QSslConfiguration, QSslSocket, \ |
|
16 QSslError, QSsl |
|
17 |
|
18 from E5Gui import E5MessageBox |
|
19 |
|
20 import Preferences |
|
21 import Utilities |
|
22 import Globals |
|
23 from Globals import qVersionTuple |
|
24 |
|
25 |
|
26 class E5SslErrorHandler(QObject): |
|
27 """ |
|
28 Class implementing a handler for SSL errors. |
|
29 |
|
30 It also initializes the default SSL configuration with certificates |
|
31 permanently accepted by the user already. |
|
32 """ |
|
33 NotIgnored = 0 |
|
34 SystemIgnored = 1 |
|
35 UserIgnored = 2 |
|
36 |
|
37 def __init__(self, parent=None): |
|
38 """ |
|
39 Constructor |
|
40 |
|
41 @param parent reference to the parent object (QObject) |
|
42 """ |
|
43 super(E5SslErrorHandler, self).__init__(parent) |
|
44 |
|
45 caList = self.__getSystemCaCertificates() |
|
46 if Preferences.Prefs.settings.contains("Help/CaCertificatesDict"): |
|
47 # port old entries stored under 'Help' |
|
48 certificateDict = Globals.toDict( |
|
49 Preferences.Prefs.settings.value("Help/CaCertificatesDict")) |
|
50 Preferences.Prefs.settings.setValue( |
|
51 "Ssl/CaCertificatesDict", certificateDict) |
|
52 Preferences.Prefs.settings.remove("Help/CaCertificatesDict") |
|
53 else: |
|
54 certificateDict = Globals.toDict( |
|
55 Preferences.Prefs.settings.value("Ssl/CaCertificatesDict")) |
|
56 for server in certificateDict: |
|
57 for cert in QSslCertificate.fromData(certificateDict[server]): |
|
58 if cert not in caList: |
|
59 caList.append(cert) |
|
60 sslCfg = QSslConfiguration.defaultConfiguration() |
|
61 sslCfg.setCaCertificates(caList) |
|
62 try: |
|
63 sslProtocol = QSsl.TlsV1_1OrLater |
|
64 if Globals.isWindowsPlatform() and platform.win32_ver()[0] == '7': |
|
65 sslProtocol = QSsl.SecureProtocols |
|
66 except AttributeError: |
|
67 sslProtocol = QSsl.SecureProtocols |
|
68 sslCfg.setProtocol(sslProtocol) |
|
69 try: |
|
70 sslCfg.setSslOption(QSsl.SslOptionDisableCompression, True) |
|
71 except AttributeError: |
|
72 pass |
|
73 QSslConfiguration.setDefaultConfiguration(sslCfg) |
|
74 |
|
75 def sslErrorsReplySlot(self, reply, errors): |
|
76 """ |
|
77 Public slot to handle SSL errors for a network reply. |
|
78 |
|
79 @param reply reference to the reply object (QNetworkReply) |
|
80 @param errors list of SSL errors (list of QSslError) |
|
81 """ |
|
82 self.sslErrorsReply(reply, errors) |
|
83 |
|
84 def sslErrorsReply(self, reply, errors): |
|
85 """ |
|
86 Public slot to handle SSL errors for a network reply. |
|
87 |
|
88 @param reply reference to the reply object (QNetworkReply) |
|
89 @param errors list of SSL errors (list of QSslError) |
|
90 @return tuple indicating to ignore the SSL errors (one of NotIgnored, |
|
91 SystemIgnored or UserIgnored) and indicating a change of the |
|
92 default SSL configuration (boolean) |
|
93 """ |
|
94 url = reply.url() |
|
95 ignore, defaultChanged = self.sslErrors(errors, url.host(), url.port()) |
|
96 if ignore: |
|
97 if defaultChanged: |
|
98 reply.setSslConfiguration( |
|
99 QSslConfiguration.defaultConfiguration()) |
|
100 reply.ignoreSslErrors() |
|
101 else: |
|
102 reply.abort() |
|
103 |
|
104 return ignore, defaultChanged |
|
105 |
|
106 def sslErrors(self, errors, server, port=-1): |
|
107 """ |
|
108 Public method to handle SSL errors. |
|
109 |
|
110 @param errors list of SSL errors (list of QSslError) |
|
111 @param server name of the server (string) |
|
112 @keyparam port value of the port (integer) |
|
113 @return tuple indicating to ignore the SSL errors (one of NotIgnored, |
|
114 SystemIgnored or UserIgnored) and indicating a change of the |
|
115 default SSL configuration (boolean) |
|
116 """ |
|
117 caMerge = {} |
|
118 certificateDict = Globals.toDict( |
|
119 Preferences.Prefs.settings.value("Ssl/CaCertificatesDict")) |
|
120 for caServer in certificateDict: |
|
121 caMerge[caServer] = QSslCertificate.fromData( |
|
122 certificateDict[caServer]) |
|
123 caNew = [] |
|
124 |
|
125 errorStrings = [] |
|
126 if port != -1: |
|
127 server += ":{0:d}".format(port) |
|
128 if errors: |
|
129 for err in errors: |
|
130 if err.error() == QSslError.NoError: |
|
131 continue |
|
132 if server in caMerge and err.certificate() in caMerge[server]: |
|
133 continue |
|
134 errorStrings.append(err.errorString()) |
|
135 if not err.certificate().isNull(): |
|
136 cert = err.certificate() |
|
137 if cert not in caNew: |
|
138 caNew.append(cert) |
|
139 if not errorStrings: |
|
140 return E5SslErrorHandler.SystemIgnored, False |
|
141 |
|
142 errorString = '.</li><li>'.join(errorStrings) |
|
143 ret = E5MessageBox.yesNo( |
|
144 None, |
|
145 self.tr("SSL Errors"), |
|
146 self.tr("""<p>SSL Errors for <br /><b>{0}</b>""" |
|
147 """<ul><li>{1}</li></ul></p>""" |
|
148 """<p>Do you want to ignore these errors?</p>""") |
|
149 .format(server, errorString), |
|
150 icon=E5MessageBox.Warning) |
|
151 |
|
152 if ret: |
|
153 caRet = False |
|
154 if len(caNew) > 0: |
|
155 certinfos = [] |
|
156 for cert in caNew: |
|
157 certinfos.append(self.__certToString(cert)) |
|
158 caRet = E5MessageBox.yesNo( |
|
159 None, |
|
160 self.tr("Certificates"), |
|
161 self.tr( |
|
162 """<p>Certificates:<br/>{0}<br/>""" |
|
163 """Do you want to accept all these certificates?""" |
|
164 """</p>""") |
|
165 .format("".join(certinfos))) |
|
166 if caRet: |
|
167 if server not in caMerge: |
|
168 caMerge[server] = [] |
|
169 for cert in caNew: |
|
170 caMerge[server].append(cert) |
|
171 |
|
172 sslCfg = QSslConfiguration.defaultConfiguration() |
|
173 caList = sslCfg.caCertificates() |
|
174 for cert in caNew: |
|
175 caList.append(cert) |
|
176 sslCfg.setCaCertificates(caList) |
|
177 try: |
|
178 sslCfg.setProtocol(QSsl.TlsV1_1OrLater) |
|
179 except AttributeError: |
|
180 sslCfg.setProtocol(QSsl.SecureProtocols) |
|
181 try: |
|
182 sslCfg.setSslOption(QSsl.SslOptionDisableCompression, |
|
183 True) |
|
184 except AttributeError: |
|
185 pass |
|
186 QSslConfiguration.setDefaultConfiguration(sslCfg) |
|
187 |
|
188 certificateDict = {} |
|
189 for server in caMerge: |
|
190 pems = QByteArray() |
|
191 for cert in caMerge[server]: |
|
192 pems.append(cert.toPem() + b'\n') |
|
193 certificateDict[server] = pems |
|
194 Preferences.Prefs.settings.setValue( |
|
195 "Ssl/CaCertificatesDict", |
|
196 certificateDict) |
|
197 |
|
198 return E5SslErrorHandler.UserIgnored, caRet |
|
199 |
|
200 else: |
|
201 return E5SslErrorHandler.NotIgnored, False |
|
202 |
|
203 def __certToString(self, cert): |
|
204 """ |
|
205 Private method to convert a certificate to a formatted string. |
|
206 |
|
207 @param cert certificate to convert (QSslCertificate) |
|
208 @return formatted string (string) |
|
209 """ |
|
210 result = "<p>" |
|
211 |
|
212 if qVersionTuple() >= (5, 0, 0): |
|
213 result += self.tr("Name: {0}")\ |
|
214 .format(Utilities.html_encode(Utilities.decodeString( |
|
215 ", ".join(cert.subjectInfo(QSslCertificate.CommonName))))) |
|
216 |
|
217 result += self.tr("<br/>Organization: {0}")\ |
|
218 .format(Utilities.html_encode(Utilities.decodeString( |
|
219 ", ".join(cert.subjectInfo( |
|
220 QSslCertificate.Organization))))) |
|
221 |
|
222 result += self.tr("<br/>Issuer: {0}")\ |
|
223 .format(Utilities.html_encode(Utilities.decodeString( |
|
224 ", ".join(cert.issuerInfo(QSslCertificate.CommonName))))) |
|
225 else: |
|
226 result += self.tr("Name: {0}")\ |
|
227 .format(Utilities.html_encode(Utilities.decodeString( |
|
228 cert.subjectInfo(QSslCertificate.CommonName)))) |
|
229 |
|
230 result += self.tr("<br/>Organization: {0}")\ |
|
231 .format(Utilities.html_encode(Utilities.decodeString( |
|
232 cert.subjectInfo(QSslCertificate.Organization)))) |
|
233 |
|
234 result += self.tr("<br/>Issuer: {0}")\ |
|
235 .format(Utilities.html_encode(Utilities.decodeString( |
|
236 cert.issuerInfo(QSslCertificate.CommonName)))) |
|
237 |
|
238 result += self.tr( |
|
239 "<br/>Not valid before: {0}<br/>Valid Until: {1}")\ |
|
240 .format(Utilities.html_encode( |
|
241 cert.effectiveDate().toString("yyyy-MM-dd")), |
|
242 Utilities.html_encode( |
|
243 cert.expiryDate().toString("yyyy-MM-dd"))) |
|
244 |
|
245 result += "</p>" |
|
246 |
|
247 return result |
|
248 |
|
249 def __getSystemCaCertificates(self): |
|
250 """ |
|
251 Private method to get the list of system certificates. |
|
252 |
|
253 @return list of system certificates (list of QSslCertificate) |
|
254 """ |
|
255 caList = QSslCertificate.fromData(Globals.toByteArray( |
|
256 Preferences.Prefs.settings.value("Ssl/SystemCertificates"))) |
|
257 if not caList: |
|
258 caList = QSslSocket.systemCaCertificates() |
|
259 return caList |