-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathasgi_gssapi.py
171 lines (145 loc) · 6.56 KB
/
asgi_gssapi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import base64
import logging
import socket
import inspect
from typing import Optional, Callable, Union, List
import gssapi
__version__ = '0.1.2'
logger = logging.getLogger(__name__)
class SPNEGOAuthMiddleware:
def __init__(
self,
app,
service: str = "HTTP",
hostname: Optional[str] = None,
service_principal: Optional[str] = None,
auth_required_callback: Optional[Callable] = None,
gssapi_mech: gssapi.MechType = gssapi.MechType.kerberos,
unauthorized: Optional[Union[str, List[dict]]] = None,
forbidden: Optional[Union[str, List[dict]]] = None,
) -> None:
"""
:param app: ASGI Application.
:param service: Service name (defaults to "HTTP"). Note: those are case-sensitive.
:param hostname: Service host (defaults to `socket.gethostname()`).
:param service_principal: Service principal (defaults to "{service}@{hostname}").
:param auth_required_callback: Optional callback with (scope: dict)->bool signature.
:param gssapi_mech: GSSAPI Auth mechanism, defaults to kerberos.
:param unauthorized: Override 'unauthrozied' response. Can be a list of ASGI events or a string.
Note: if you provide ASGI events here, your HTTP headers **MUST** include
(b"www-authenticate": b"Negotiate").
:param forbidden: Override 'forbidden' response. Can be a list of ASGI events or a string.
"""
self._app = app
if service_principal is not None:
self.service_principal = service_principal
else:
self.service_principal = "{}@{}".format(service, hostname if hostname is not None else socket.gethostname())
self._auth_required_callback = auth_required_callback or (lambda x: True)
self._unauthorized_events = unauthorized
self._forbidden_events = forbidden
# Prepare re-usable GSSAPI objects.
self._service_name = gssapi.Name(self.service_principal, gssapi.NameType.hostbased_service)
self._service_cname = self._service_name.canonicalize(gssapi_mech)
self._service_creds = gssapi.Credentials(name=self._service_cname, usage="accept")
def _error_response(self, status_code: int, headers: dict = None, message: Union[str, bytes] = ""):
if not headers:
headers = {}
if not isinstance(message, bytes):
message = message.encode("utf-8")
length = len(message)
return [
{
"type": "http.response.start",
"status": status_code,
"headers": [
(b"content-length", str(length).encode("ascii")),
(b"content-type", b"text/plain"),
*[(k, v) for k, v in headers.items()],
],
},
{"type": "http.response.body", "body": message},
]
def _unauthorized(self, message: str = ""):
if self._unauthorized_events:
if not isinstance(self._unauthorized_events, (str, bytes)):
return self._unauthorized_events
else:
message = self._unauthorized_events
return self._error_response(401, {b"www-authenticate": b"Negotiate"}, message or "Unauthorized")
def _forbidden(self, message: str = ""):
if self._forbidden_events:
if not isinstance(self._forbidden_events, (str, bytes)):
return self._forbidden_events
else:
message = self._forbidden_events
return self._error_response(403, None, message or "Forbidden")
async def _send_error(self, send, error, message: str = ""):
for event in error(message):
await send(event)
def _gssapi_authenticate(self, ctx: dict, token: str) -> bool:
"""
Invokes GSSAPI SecurityContext and runs auth steps.
:param ctx: a dict to store state.
:param token: base64-encoded input token.
Returns True if authentication was complete,
Returns False if authentication must continue,
Raises an exception on authentication failure.
"""
in_token = base64.b64decode(token)
sec_ctx = gssapi.SecurityContext(creds=self._service_creds)
out_token = sec_ctx.step(in_token)
if out_token:
ctx["token"] = base64.b64encode(out_token).decode("ascii")
if sec_ctx.initiator_name:
ctx["principal"] = str(sec_ctx.initiator_name)
if sec_ctx.delegated_creds:
ctx["delegate_creds"] = sec_ctx.delegated_creds.export()
return sec_ctx.complete
async def __call__(self, scope, receive, send) -> None:
""" ASGI entry-point. """
scope["gssapi"] = ctx = {
"token": None,
"principal": None,
"delegate_creds": None,
}
if scope["type"] != "http":
return await self._app(scope, receive, send)
if inspect.iscoroutinefunction(self._auth_required_callback):
auth_required = await self._auth_required_callback(scope)
else:
auth_required = self._auth_required_callback(scope)
auth_attempted = False
auth_complete = False
www_auth_header = []
headers = {k: v for k, v in scope["headers"] if k == b"authorization"}
header = headers.get(b"authorization", b"").decode("utf-8")
if header:
if header.lower().startswith("negotiate "):
token = header[len("negotiate "):]
auth_attempted = True
try:
auth_complete = self._gssapi_authenticate(ctx, token)
except Exception:
logger.exception("GSSAPI Auth failure.")
async def wrapped_send(event):
if event["type"] == "http.response.start":
event["headers"] = [*event["headers"], *www_auth_header]
await send(event)
# Select response
if auth_complete:
if ctx.get("token", None): # Finish mutual auth
www_auth_header = [
(
b"www-authenticate",
"Negotiate {}".format(ctx["token"]).encode("utf-8"),
)
]
return await self._app(scope, receive, wrapped_send)
elif auth_required:
if auth_attempted:
return await self._send_error(send, self._forbidden)
else:
return await self._send_error(send, self._unauthorized)
else:
return await self._app(scope, receive, send)