Skip to content

gh-95341: Implement tls-exporter channel bindings and export key materials #95366

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 48 additions & 3 deletions Lib/ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,33 @@ class _TLSMessageType:
CHANGE_CIPHER_SPEC = 0x0101


@_simple_enum(_Enum)
class ChannelBindings:
TLS_UNIQUE = "tls-unique"
TLS_EXPORTER = "tls-exporter"

def _get_channel_binding(self, sslobj):
cls = type(self)
match self:
case cls.TLS_UNIQUE:
if sslobj.version() == "TLSv1.3":
warnings.warn(
"tls-unique channel binding is not specified for TLS 1.3",
DeprecationWarning,
stacklevel=3
)
return sslobj.get_channel_binding(self.value)
case cls.TLS_EXPORTER:
return sslobj.export_keying_material(
32,
"EXPORTER-Channel-Binding",
context="",
require_extms=True
)
case _:
raise ValueError(f"{self!r} channel binding type not implemented")


if sys.platform == "win32":
from _ssl import enum_certificates, enum_crls

Expand All @@ -267,7 +294,7 @@ class _TLSMessageType:

socket_error = OSError # keep that public name in module namespace

CHANNEL_BINDING_TYPES = ['tls-unique']
CHANNEL_BINDING_TYPES = list(cb.value for cb in ChannelBindings)

HAS_NEVER_CHECK_COMMON_NAME = hasattr(_ssl, 'HOSTFLAG_NEVER_CHECK_SUBJECT')

Expand Down Expand Up @@ -924,7 +951,16 @@ def get_channel_binding(self, cb_type="tls-unique"):
"""Get channel binding data for current connection. Raise ValueError
if the requested `cb_type` is not supported. Return bytes of the data
or None if the data is not available (e.g. before the handshake)."""
return self._sslobj.get_channel_binding(cb_type)
return ChannelBindings(cb_type)._get_channel_binding(self._sslobj)

def export_keying_material(self, length, label, context=None, require_extms=True):
"""Export keying material for current connection

See RFC 5705 (for TLS 1.2) and RFC 8446 (for TLS 1.3)
"""
return self._sslobj.export_keying_material(
length, label, context=context, require_extms=require_extms
)

def version(self):
"""Return a string identifying the protocol version used by the
Expand Down Expand Up @@ -1336,14 +1372,23 @@ def accept(self):
@_sslcopydoc
def get_channel_binding(self, cb_type="tls-unique"):
if self._sslobj is not None:
return self._sslobj.get_channel_binding(cb_type)
return ChannelBindings(cb_type)._get_channel_binding(self._sslobj)
else:
if cb_type not in CHANNEL_BINDING_TYPES:
raise ValueError(
"{0} channel binding type not implemented".format(cb_type)
)
return None

@_sslcopydoc
def export_keying_material(self, length, label, context=None, require_extms=True):
if self._sslobj is not None:
return self._sslobj.export_keying_material(
length, label, context=context, require_extms=require_extms
)
else:
return None

@_sslcopydoc
def version(self):
if self._sslobj is not None:
Expand Down
93 changes: 62 additions & 31 deletions Lib/test/test_ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,17 +668,17 @@ def test_unknown_channel_binding(self):
ss.get_channel_binding("unknown-type")
s.close()

@unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
"'tls-unique' channel binding not available")
def test_tls_unique_channel_binding(self):
def test_tls_channel_binding(self):
# unconnected should return None for known type
s = socket.socket(socket.AF_INET)
with test_wrap_socket(s) as ss:
self.assertIsNone(ss.get_channel_binding("tls-unique"))
self.assertIsNone(ss.get_channel_binding("tls-exporter"))
# the same for server-side
s = socket.socket(socket.AF_INET)
with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
self.assertIsNone(ss.get_channel_binding("tls-unique"))
self.assertIsNone(ss.get_channel_binding("tls-exporter"))

def test_dealloc_warn(self):
ss = test_wrap_socket(socket.socket(socket.AF_INET))
Expand Down Expand Up @@ -2086,15 +2086,15 @@ def test_bio_handshake(self):
self.assertIsNone(sslobj.version())
self.assertIsNotNone(sslobj.shared_ciphers())
self.assertRaises(ValueError, sslobj.getpeercert)
if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
for cb in ssl.CHANNEL_BINDING_TYPES:
self.assertIsNone(sslobj.get_channel_binding(cb))
self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
self.assertTrue(sslobj.cipher())
self.assertIsNotNone(sslobj.shared_ciphers())
self.assertIsNotNone(sslobj.version())
self.assertTrue(sslobj.getpeercert())
if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
self.assertTrue(sslobj.get_channel_binding('tls-unique'))
self.assertTrue(sslobj.get_channel_binding("tls-unique"))
self.assertTrue(sslobj.get_channel_binding("tls-exporter"))
try:
self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
except ssl.SSLSyscallError:
Expand Down Expand Up @@ -2317,6 +2317,12 @@ def run(self):
sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
data = self.sslconn.get_channel_binding("tls-unique")
self.write(repr(data).encode("us-ascii") + b"\n")
elif stripped == b'CB tls-exporter':
if support.verbose and self.server.connectionchatty:
sys.stdout.write(" server: read CB tls-exporter from client, sending our CB data...\n")
data = self.sslconn.get_channel_binding("tls-exporter")
self.write(repr(data).encode("us-ascii") + b"\n")

elif stripped == b'PHA':
if support.verbose and self.server.connectionchatty:
sys.stdout.write(" server: initiating post handshake auth\n")
Expand Down Expand Up @@ -3737,14 +3743,31 @@ def test_default_ecdh_curve(self):
s.connect((HOST, server.port))
self.assertIn("ECDH", s.cipher()[0])

@unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
"'tls-unique' channel binding not available")
def test_tls_unique_channel_binding(self):
"""Test tls-unique channel binding."""
def test_tls_channel_binding_unique_tlsv1_2(self):
self._test_tls_channel_binding(ssl.TLSVersion.TLSv1_2, "tls-unique", 12)

def test_tls_channel_binding_unique_tlsv1_3(self):
with warnings_helper.check_warnings(("tls-unique", DeprecationWarning)):
self._test_tls_channel_binding(ssl.TLSVersion.TLSv1_3, "tls-unique", 48)

def test_tls_channel_binding_exporter_tlsv1_2(self):
self._test_tls_channel_binding(
ssl.TLSVersion.TLSv1_2, "tls-exporter", 32, "EXPORTER-Channel-Binding"
)

def test_tls_channel_binding_exporter_tlsv1_3(self):
self._test_tls_channel_binding(
ssl.TLSVersion.TLSv1_3, "tls-exporter", 32, "EXPORTER-Channel-Binding"
)

def _test_tls_channel_binding(self, version, cb, cb_size, ekm=None):
"""Test tls-unique and tls-export channel binding."""
if support.verbose:
sys.stdout.write("\n")

client_context, server_context, hostname = testing_context()
server_context.minimum_version = version
server_context.maximum_version = version

server = ThreadedEchoServer(context=server_context,
chatty=True,
Expand All @@ -3756,45 +3779,53 @@ def test_tls_unique_channel_binding(self):
server_hostname=hostname) as s:
s.connect((HOST, server.port))
# get the data
cb_data = s.get_channel_binding("tls-unique")
cb_data = s.get_channel_binding(cb)
if support.verbose:
sys.stdout.write(
" got channel binding data: {0!r}\n".format(cb_data))
f" got {cb} channel binding data: {cb_data!r}\n"
)

if ekm:
cb_ekm = s.export_keying_material(
cb_size, ekm, context="", require_extms=True
)
self.assertEqual(cb_ekm, cb_data)
# TLS 1.3: empty and no context result in equal values
# other: empty context and no context result in different values
cb_ekm_no_context = s.export_keying_material(cb_size, ekm)
if version == ssl.TLSVersion.TLSv1_3:
self.assertEqual(cb_data, cb_ekm_no_context)
else:
self.assertNotEqual(cb_data, cb_ekm_no_context)

# check if it is sane
self.assertIsNotNone(cb_data)
if s.version() == 'TLSv1.3':
self.assertEqual(len(cb_data), 48)
else:
self.assertEqual(len(cb_data), 12) # True for TLSv1
self.assertEqual(len(cb_data), cb_size)

# and compare with the peers version
s.write(b"CB tls-unique\n")
peer_data_repr = s.read().strip()
self.assertEqual(peer_data_repr,
s.write(f"CB {cb}\n".encode("ascii"))
peer_data = s.read().strip()
self.assertEqual(peer_data,
repr(cb_data).encode("us-ascii"))

# now, again
with client_context.wrap_socket(
socket.socket(),
server_hostname=hostname) as s:
s.connect((HOST, server.port))
new_cb_data = s.get_channel_binding("tls-unique")
new_cb_data = s.get_channel_binding(cb)
if support.verbose:
sys.stdout.write(
"got another channel binding data: {0!r}\n".format(
new_cb_data)
f" got another {cb} channel binding data: {new_cb_data!r}\n"
)
# is it really unique
self.assertNotEqual(cb_data, new_cb_data)
self.assertIsNotNone(cb_data)
if s.version() == 'TLSv1.3':
self.assertEqual(len(cb_data), 48)
else:
self.assertEqual(len(cb_data), 12) # True for TLSv1
s.write(b"CB tls-unique\n")
peer_data_repr = s.read().strip()
self.assertEqual(peer_data_repr,
self.assertIsNotNone(new_cb_data)
self.assertEqual(len(cb_data), cb_size)

s.write(f"CB {cb}\n".encode("ascii"))
peer_unique= s.read().strip()
self.assertEqual(peer_unique,
repr(new_cb_data).encode("us-ascii"))

def test_compression(self):
Expand Down
79 changes: 70 additions & 9 deletions Modules/_ssl.c
Original file line number Diff line number Diff line change
Expand Up @@ -2699,10 +2699,9 @@ _ssl__SSLSocket_get_channel_binding_impl(PySSLSocket *self,
const char *cb_type)
/*[clinic end generated code: output=34bac9acb6a61d31 input=08b7e43b99c17d41]*/
{
char buf[PySSL_CB_MAXLEN];
size_t len;

if (strcmp(cb_type, "tls-unique") == 0) {
char buf[PySSL_CB_MAXLEN];
size_t len;
if (SSL_session_reused(self->ssl) ^ !self->socket_type) {
/* if session is resumed XOR we are the client */
len = SSL_get_finished(self->ssl, buf, PySSL_CB_MAXLEN);
Expand All @@ -2711,21 +2710,82 @@ _ssl__SSLSocket_get_channel_binding_impl(PySSLSocket *self,
/* if a new session XOR we are the server */
len = SSL_get_peer_finished(self->ssl, buf, PySSL_CB_MAXLEN);
}
}
else {
/* It cannot be negative in current OpenSSL version as of July 2011 */
if (len == 0)
Py_RETURN_NONE;

return PyBytes_FromStringAndSize(buf, len);
} else {
PyErr_Format(
PyExc_ValueError,
"'%s' channel binding type not implemented",
cb_type
);
return NULL;
}
}

/* It cannot be negative in current OpenSSL version as of July 2011 */
if (len == 0)
Py_RETURN_NONE;
/*[clinic input]
_ssl._SSLSocket.export_keying_material
length: Py_ssize_t
label: str(accept={str, robuffer}, zeroes=True)
*
context: str(accept={str, robuffer, NoneType}, zeroes=True) = None
require_extms: bool = True

Get keying material for current connection.

See RFC 5705 (for TLS 1.2) and RFC 8446 (for TLS 1.3)
[clinic start generated code]*/

static PyObject *
_ssl__SSLSocket_export_keying_material_impl(PySSLSocket *self,
Py_ssize_t length,
const char *label,
Py_ssize_t label_length,
const char *context,
Py_ssize_t context_length,
int require_extms)
/*[clinic end generated code: output=7ff9f8a13d326773 input=1ed86925a1a8c634]*/
{
PyObject *km;

return PyBytes_FromStringAndSize(buf, len);
if (!SSL_is_init_finished(self->ssl)) {
Py_RETURN_NONE;
}
if (length < 1) {
PyErr_SetString(PyExc_ValueError, "invalid export length");
return NULL;
}
if (require_extms) {
// RFC 9266 requires extended master secret for tls-exporter
// channel binding. EMS is always present with TLS 1.3 and an
// optional extension with TLS 1.2.
if (SSL_version(self->ssl) != TLS1_3_VERSION) {
int res = SSL_get_extms_support(self->ssl);
if (res == -1) {
return _setSSLError(get_state_sock(self), NULL, 0, __FILE__, __LINE__);
}
if (res == 0) {
PyErr_SetString(PyExc_ValueError, "connect has no extended master secret");
return NULL;
}
}
}
km = PyBytes_FromStringAndSize(NULL, length);
if (km == NULL) {
return NULL;
}
if (SSL_export_keying_material(
self->ssl,
(unsigned char*)PyBytes_AS_STRING(km), (size_t)length,
label, (size_t)label_length,
(const unsigned char*)context, (size_t)context_length,
(context != NULL)) != 1) {
Py_DECREF(km);
return _setSSLError(get_state_sock(self), NULL, 0, __FILE__, __LINE__);
}
return km;
}

/*[clinic input]
Expand Down Expand Up @@ -2918,6 +2978,7 @@ static PyMethodDef PySSLMethods[] = {
_SSL__SSLSOCKET_VERIFY_CLIENT_POST_HANDSHAKE_METHODDEF
_SSL__SSLSOCKET_GET_UNVERIFIED_CHAIN_METHODDEF
_SSL__SSLSOCKET_GET_VERIFIED_CHAIN_METHODDEF
_SSL__SSLSOCKET_EXPORT_KEYING_MATERIAL_METHODDEF
{NULL, NULL}
};

Expand Down
Loading