Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial TLS 1.3 support #720

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kmip/core/config_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ConfigHelper(object):
FILE_PATH, '../demos/certs/server.key'))
DEFAULT_CA_CERTS = os.path.normpath(os.path.join(
FILE_PATH, '../demos/certs/server.crt'))
DEFAULT_SSL_VERSION = 'PROTOCOL_SSLv23'
DEFAULT_TLS_CLIENT = 'PROTOCOL_TLS_CLIENT'
DEFAULT_USERNAME = None
DEFAULT_PASSWORD = None

Expand Down
58 changes: 58 additions & 0 deletions kmip/services/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,61 @@ def __init__(self, cipher_suites=None):
"""
super(TLS12AuthenticationSuite, self).__init__(cipher_suites)
self._protocol = ssl.PROTOCOL_TLSv1_2


class ClientAuthenticationSuite(AuthenticationSuite):
"""
An authentication suite used to establish secure network connections.

Supports TLS 1.3.
https://docs.openssl.org/3.3/man1/openssl-ciphers/#tls-v13-cipher-suites.
"""

_default_cipher_suites = [
'TLS_AES_128_GCM_SHA256',
'TLS_AES_256_GCM_SHA384',
'TLS_CHACHA20_POLY1305_SHA256',
'TLS_AES_128_CCM_SHA256',
'TLS_AES_128_CCM_8_SHA256'
]

def __init__(self, cipher_suites=None):
"""
Create a ClientAuthenticationSuite object.

Args:
cipher_suites (list): A list of strings representing the names of
cipher suites to use. Overrides the default set of cipher
suites. Optional, defaults to None.
"""
super(ClientAuthenticationSuite, self).__init__(cipher_suites)
self._protocol = ssl.PROTOCOL_TLS_CLIENT


class ServerAuthenticationSuite(AuthenticationSuite):
"""
An authentication suite used to establish secure network connections.

Supports TLS 1.3.
https://docs.openssl.org/3.3/man1/openssl-ciphers/#tls-v13-cipher-suites.
"""

_default_cipher_suites = [
'TLS_AES_128_GCM_SHA256',
'TLS_AES_256_GCM_SHA384',
'TLS_CHACHA20_POLY1305_SHA256',
'TLS_AES_128_CCM_SHA256',
'TLS_AES_128_CCM_8_SHA256'
]

def __init__(self, cipher_suites=None):
"""
Create a ServerAuthenticationSuite object.

Args:
cipher_suites (list): A list of strings representing the names of
cipher suites to use. Overrides the default set of cipher
suites. Optional, defaults to None.
"""
super(ServerAuthenticationSuite, self).__init__(cipher_suites)
self._protocol = ssl.PROTOCOL_TLS_SERVER
18 changes: 11 additions & 7 deletions kmip/services/kmip_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,17 @@ def open(self):
six.reraise(*last_error)

def _create_socket(self, sock):
self.socket = ssl.wrap_socket(
context = ssl.SSLContext(self.ssl_version)
context.verify_mode = self.cert_reqs
if self.ca_certs:
context.load_verify_locations(self.ca_certs)
if self.keyfile and not self.certfile:
raise ValueError("certfile must be specified")
if self.certfile:
context.load_cert_chain(self.certfile, self.keyfile)
self.socket = context.wrap_socket(
sock,
keyfile=self.keyfile,
certfile=self.certfile,
cert_reqs=self.cert_reqs,
ssl_version=self.ssl_version,
ca_certs=self.ca_certs,
server_side=False,
do_handshake_on_connect=self.do_handshake_on_connect,
suppress_ragged_eofs=self.suppress_ragged_eofs)
self.socket.settimeout(self.timeout)
Expand Down Expand Up @@ -1762,7 +1766,7 @@ def _set_variables(self, host, port, keyfile, certfile,
cert_reqs, self.config, 'cert_reqs', 'CERT_REQUIRED'))

self.ssl_version = getattr(ssl, conf.get_valid_value(
ssl_version, self.config, 'ssl_version', conf.DEFAULT_SSL_VERSION))
ssl_version, self.config, 'ssl_version', conf.DEFAULT_TLS_CLIENT))

self.ca_certs = conf.get_valid_value(
ca_certs, self.config, 'ca_certs', conf.DEFAULT_CA_CERTS)
Expand Down
7 changes: 4 additions & 3 deletions kmip/services/server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,22 @@ def __init__(self):
self.settings['tls_cipher_suites'] = []
self.settings['logging_level'] = logging.INFO
self.settings['auth_plugins'] = []
self.settings['auth_suite'] = 'TLS_SERVER'

self._expected_settings = [
'hostname',
'port',
'certificate_path',
'key_path',
'ca_path',
'auth_suite'
'ca_path'
]
self._optional_settings = [
'policy_path',
'enable_tls_client_auth',
'tls_cipher_suites',
'logging_level',
'database_path'
'database_path',
'auth_suite'
]

def set_setting(self, setting, value):
Expand Down
30 changes: 21 additions & 9 deletions kmip/services/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,10 @@ def __init__(
cipher_suites = self.config.settings.get('tls_cipher_suites')
if self.config.settings.get('auth_suite') == 'TLS1.2':
self.auth_suite = auth.TLS12AuthenticationSuite(cipher_suites)
else:
elif self.config.settings.get('auth_suite') == 'Basic':
self.auth_suite = auth.BasicAuthenticationSuite(cipher_suites)
else:
self.auth_suite = auth.ServerAuthenticationSuite(cipher_suites)

self._session_id = 1
self._is_serving = False
Expand Down Expand Up @@ -287,17 +289,27 @@ def interrupt_handler(trigger, frame):
for cipher in auth_suite_ciphers:
self._logger.debug(cipher)

self._socket = ssl.wrap_socket(
cafile = self.config.settings.get('ca_path')
context = ssl.SSLContext(self.auth_suite.protocol)
context.verify_mode = ssl.CERT_REQUIRED
if (self.auth_suite.ciphers and
self.auth_suite.protocol != ssl.PROTOCOL_TLS_SERVER):
context.set_ciphers(self.auth_suite.ciphers)
if cafile:
context.load_verify_locations(cafile)
certfile = self.config.settings.get('certificate_path')

if certfile:
keyfile = self.config.settings.get('key_path')
context.load_cert_chain(certfile, keyfile=keyfile)
else:
raise ValueError("certfile must be specified for server-side operations")

self._socket = context.wrap_socket(
self._socket,
keyfile=self.config.settings.get('key_path'),
certfile=self.config.settings.get('certificate_path'),
server_side=True,
cert_reqs=ssl.CERT_REQUIRED,
ssl_version=self.auth_suite.protocol,
ca_certs=self.config.settings.get('ca_path'),
do_handshake_on_connect=False,
suppress_ragged_eofs=True,
ciphers=self.auth_suite.ciphers
suppress_ragged_eofs=True
)

try:
Expand Down
8 changes: 4 additions & 4 deletions kmip/tests/unit/services/server/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_init(self, config_mock, logging_mock):
self.assertTrue(config_mock.called)
self.assertTrue(logging_mock.called)

self.assertIsInstance(s.auth_suite, auth.BasicAuthenticationSuite)
self.assertIsInstance(s.auth_suite, auth.ServerAuthenticationSuite)
self.assertEqual(1, s._session_id)
self.assertFalse(s._is_serving)

Expand Down Expand Up @@ -210,9 +210,9 @@ def test_start(self,
# Test that in ideal cases no errors are generated and the right
# log messages are.
with mock.patch('socket.socket') as socket_mock:
with mock.patch('ssl.wrap_socket') as ssl_mock:
with mock.patch('ssl.SSLContext') as ssl_mock:
socket_mock.return_value = a_mock
ssl_mock.return_value = b_mock
ssl_mock.return_value.wrap_socket.return_value = b_mock

manager_mock.assert_not_called()
monitor_mock.assert_not_called()
Expand Down Expand Up @@ -271,7 +271,7 @@ def test_start(self,

# Test that a NetworkingError is generated if the socket bind fails.
with mock.patch('socket.socket') as socket_mock:
with mock.patch('ssl.wrap_socket') as ssl_mock:
with mock.patch('ssl.SSLContext.wrap_socket') as ssl_mock:
socket_mock.return_value = a_mock
ssl_mock.return_value = b_mock

Expand Down
114 changes: 114 additions & 0 deletions kmip/tests/unit/services/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,117 @@ def test_custom_ciphers_empty(self):
self.assertIn('ECDHE-ECDSA-AES256-GCM-SHA384', suites)
self.assertIn('ECDHE-ECDSA-AES128-SHA256', suites)
self.assertIn('ECDHE-ECDSA-AES256-SHA384', suites)


class TestClientAuthenticationSuite(testtools.TestCase):
"""
A test suite for the ClientAuthenticationSuite.
"""

def setUp(self):
super(TestClientAuthenticationSuite, self).setUp()

def tearDown(self):
super(TestClientAuthenticationSuite, self).tearDown()

def test_init(self):
auth.ClientAuthenticationSuite()

def test_protocol(self):
suite = auth.ClientAuthenticationSuite()
protocol = suite.protocol

self.assertIsInstance(protocol, int)
self.assertEqual(ssl.PROTOCOL_TLS_CLIENT, suite.protocol)

def test_ciphers(self):
suite = auth.ClientAuthenticationSuite()
ciphers = suite.ciphers

self.assertIsInstance(ciphers, str)

cipher_string = ':'.join((
'TLS_AES_128_GCM_SHA256',
'TLS_AES_256_GCM_SHA384',
'TLS_CHACHA20_POLY1305_SHA256',
'TLS_AES_128_CCM_SHA256',
'TLS_AES_128_CCM_8_SHA256',
))

self.assertEqual(cipher_string, ciphers)

def test_custom_ciphers_empty(self):
"""
Test that providing a custom list of cipher suites that ultimately
yields an empty suite list causes the default cipher suite list to
be provided instead.
"""
suite = auth.ClientAuthenticationSuite(
[
'TLS_RSA_WITH_AES_256_CBC_SHA'
]
)
ciphers = suite.ciphers

self.assertIsInstance(ciphers, str)
suites = ciphers.split(':')
self.assertEqual(5, len(suites))
self.assertIn('TLS_AES_128_GCM_SHA256', suites)
self.assertIn('TLS_CHACHA20_POLY1305_SHA256', suites)


class TestServerAuthenticationSuite(testtools.TestCase):
"""
A test suite for the ServerAuthenticationSuite.
"""

def setUp(self):
super(TestServerAuthenticationSuite, self).setUp()

def tearDown(self):
super(TestServerAuthenticationSuite, self).tearDown()

def test_init(self):
auth.ServerAuthenticationSuite()

def test_protocol(self):
suite = auth.ServerAuthenticationSuite()
protocol = suite.protocol

self.assertIsInstance(protocol, int)
self.assertEqual(ssl.PROTOCOL_TLS_SERVER, suite.protocol)

def test_ciphers(self):
suite = auth.ServerAuthenticationSuite()
ciphers = suite.ciphers

self.assertIsInstance(ciphers, str)

cipher_string = ':'.join((
'TLS_AES_128_GCM_SHA256',
'TLS_AES_256_GCM_SHA384',
'TLS_CHACHA20_POLY1305_SHA256',
'TLS_AES_128_CCM_SHA256',
'TLS_AES_128_CCM_8_SHA256',
))

self.assertEqual(cipher_string, ciphers)

def test_custom_ciphers_empty(self):
"""
Test that providing a custom list of cipher suites that ultimately
yields an empty suite list causes the default cipher suite list to
be provided instead.
"""
suite = auth.ServerAuthenticationSuite(
[
'TLS_RSA_WITH_AES_256_CBC_SHA'
]
)
ciphers = suite.ciphers

self.assertIsInstance(ciphers, str)
suites = ciphers.split(':')
self.assertEqual(5, len(suites))
self.assertIn('TLS_AES_128_GCM_SHA256', suites)
self.assertIn('TLS_CHACHA20_POLY1305_SHA256', suites)