[1.3.X] Fixed #18856 -- Ensured that redirects can't be poisoned by malicious users.

This commit is contained in:
Florian Apolloner 2012-11-17 22:00:53 +01:00
parent 6383d2358c
commit 1515eb46da
9 changed files with 164 additions and 53 deletions

View File

@ -5,7 +5,7 @@ from django.core.urlresolvers import reverse
from django.http import HttpResponseRedirect, QueryDict from django.http import HttpResponseRedirect, QueryDict
from django.shortcuts import render_to_response from django.shortcuts import render_to_response
from django.template import RequestContext from django.template import RequestContext
from django.utils.http import base36_to_int from django.utils.http import base36_to_int, is_safe_url
from django.utils.translation import ugettext as _ from django.utils.translation import ugettext as _
from django.views.decorators.cache import never_cache from django.views.decorators.cache import never_cache
from django.views.decorators.csrf import csrf_protect from django.views.decorators.csrf import csrf_protect
@ -33,18 +33,11 @@ def login(request, template_name='registration/login.html',
if request.method == "POST": if request.method == "POST":
form = authentication_form(data=request.POST) form = authentication_form(data=request.POST)
if form.is_valid(): if form.is_valid():
netloc = urlparse.urlparse(redirect_to)[1] # Ensure the user-originating redirection url is safe.
if not is_safe_url(url=redirect_to, host=request.get_host()):
# Use default setting if redirect_to is empty
if not redirect_to:
redirect_to = settings.LOGIN_REDIRECT_URL redirect_to = settings.LOGIN_REDIRECT_URL
# Security check -- don't allow redirection to a different # Okay, security check complete. Log the user in.
# host.
elif netloc and netloc != request.get_host():
redirect_to = settings.LOGIN_REDIRECT_URL
# Okay, security checks complete. Log the user in.
auth_login(request, form.get_user()) auth_login(request, form.get_user())
if request.session.test_cookie_worked(): if request.session.test_cookie_worked():
@ -76,26 +69,27 @@ def logout(request, next_page=None,
Logs out the user and displays 'You are logged out' message. Logs out the user and displays 'You are logged out' message.
""" """
auth_logout(request) auth_logout(request)
redirect_to = request.REQUEST.get(redirect_field_name, '')
if redirect_to:
netloc = urlparse.urlparse(redirect_to)[1]
# Security check -- don't allow redirection to a different host.
if not (netloc and netloc != request.get_host()):
return HttpResponseRedirect(redirect_to)
if next_page is None: if redirect_field_name in request.REQUEST:
current_site = get_current_site(request) next_page = request.REQUEST[redirect_field_name]
context = { # Security check -- don't allow redirection to a different host.
'site': current_site, if not is_safe_url(url=next_page, host=request.get_host()):
'site_name': current_site.name, next_page = request.path
'title': _('Logged out')
} if next_page:
context.update(extra_context or {})
return render_to_response(template_name, context,
context_instance=RequestContext(request, current_app=current_app))
else:
# Redirect to this page until the session has been cleared. # Redirect to this page until the session has been cleared.
return HttpResponseRedirect(next_page or request.path) return HttpResponseRedirect(next_page)
current_site = get_current_site(request)
context = {
'site': current_site,
'site_name': current_site.name,
'title': _('Logged out')
}
if extra_context is not None:
context.update(extra_context)
return render_to_response(template_name, context,
context_instance=RequestContext(request, current_app=current_app))
def logout_then_login(request, login_url=None, current_app=None, extra_context=None): def logout_then_login(request, login_url=None, current_app=None, extra_context=None):
""" """

View File

@ -40,9 +40,6 @@ def post_comment(request, next=None, using=None):
if not data.get('email', ''): if not data.get('email', ''):
data["email"] = request.user.email data["email"] = request.user.email
# Check to see if the POST data overrides the view's next argument.
next = data.get("next", next)
# Look up the object we're trying to comment about # Look up the object we're trying to comment about
ctype = data.get("content_type") ctype = data.get("content_type")
object_pk = data.get("object_pk") object_pk = data.get("object_pk")
@ -94,9 +91,9 @@ def post_comment(request, next=None, using=None):
] ]
return render_to_response( return render_to_response(
template_list, { template_list, {
"comment" : form.data.get("comment", ""), "comment": form.data.get("comment", ""),
"form" : form, "form": form,
"next": next, "next": data.get("next", next),
}, },
RequestContext(request, {}) RequestContext(request, {})
) )
@ -127,7 +124,7 @@ def post_comment(request, next=None, using=None):
request = request request = request
) )
return next_redirect(data, next, comment_done, c=comment._get_pk_val()) return next_redirect(request, next, comment_done, c=comment._get_pk_val())
comment_done = confirmation_view( comment_done = confirmation_view(
template = "comments/posted.html", template = "comments/posted.html",

View File

@ -7,6 +7,7 @@ from django.contrib import comments
from django.contrib.comments import signals from django.contrib.comments import signals
from django.views.decorators.csrf import csrf_protect from django.views.decorators.csrf import csrf_protect
@csrf_protect @csrf_protect
@login_required @login_required
def flag(request, comment_id, next=None): def flag(request, comment_id, next=None):
@ -23,7 +24,7 @@ def flag(request, comment_id, next=None):
# Flag on POST # Flag on POST
if request.method == 'POST': if request.method == 'POST':
perform_flag(request, comment) perform_flag(request, comment)
return next_redirect(request.POST.copy(), next, flag_done, c=comment.pk) return next_redirect(request, next, flag_done, c=comment.pk)
# Render a form on GET # Render a form on GET
else: else:
@ -50,7 +51,7 @@ def delete(request, comment_id, next=None):
if request.method == 'POST': if request.method == 'POST':
# Flag the comment as deleted instead of actually deleting it. # Flag the comment as deleted instead of actually deleting it.
perform_delete(request, comment) perform_delete(request, comment)
return next_redirect(request.POST.copy(), next, delete_done, c=comment.pk) return next_redirect(request, next, delete_done, c=comment.pk)
# Render a form on GET # Render a form on GET
else: else:
@ -77,7 +78,7 @@ def approve(request, comment_id, next=None):
if request.method == 'POST': if request.method == 'POST':
# Flag the comment as approved. # Flag the comment as approved.
perform_approve(request, comment) perform_approve(request, comment)
return next_redirect(request.POST.copy(), next, approve_done, c=comment.pk) return next_redirect(request, next, approve_done, c=comment.pk)
# Render a form on GET # Render a form on GET
else: else:

View File

@ -4,14 +4,15 @@ A few bits of helper functions for comment views.
import urllib import urllib
import textwrap import textwrap
from django.http import HttpResponseRedirect
from django.core import urlresolvers from django.core import urlresolvers
from django.http import HttpResponseRedirect
from django.shortcuts import render_to_response from django.shortcuts import render_to_response
from django.template import RequestContext from django.template import RequestContext
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from django.contrib import comments from django.contrib import comments
from django.utils.http import is_safe_url
def next_redirect(data, default, default_view, **get_kwargs): def next_redirect(request, default, default_view, **get_kwargs):
""" """
Handle the "where should I go next?" part of comment views. Handle the "where should I go next?" part of comment views.
@ -21,9 +22,10 @@ def next_redirect(data, default, default_view, **get_kwargs):
Returns an ``HttpResponseRedirect``. Returns an ``HttpResponseRedirect``.
""" """
next = data.get("next", default) next = request.POST.get('next', default)
if next is None: if not is_safe_url(url=next, host=request.get_host()):
next = urlresolvers.reverse(default_view) next = urlresolvers.reverse(default_view)
if get_kwargs: if get_kwargs:
if '#' in next: if '#' in next:
tmp = next.rsplit('#', 1) tmp = next.rsplit('#', 1)

View File

@ -204,3 +204,15 @@ else:
""" """
p1, p2 = urlparse.urlparse(url1), urlparse.urlparse(url2) p1, p2 = urlparse.urlparse(url1), urlparse.urlparse(url2)
return p1[0:2] == p2[0:2] return p1[0:2] == p2[0:2]
def is_safe_url(url, host=None):
"""
Return ``True`` if the url is a safe redirection (i.e. it doesn't point to
a different host).
Always returns ``False`` on an empty url.
"""
if not url:
return False
netloc = urlparse.urlparse(url)[1]
return not netloc or netloc == host

View File

@ -8,6 +8,8 @@ from django.utils.translation import check_for_language, activate, to_locale, ge
from django.utils.text import javascript_quote from django.utils.text import javascript_quote
from django.utils.encoding import smart_unicode from django.utils.encoding import smart_unicode
from django.utils.formats import get_format_modules, get_format from django.utils.formats import get_format_modules, get_format
from django.utils.http import is_safe_url
def set_language(request): def set_language(request):
""" """
@ -20,11 +22,11 @@ def set_language(request):
redirect to the page in the request (the 'next' parameter) without changing redirect to the page in the request (the 'next' parameter) without changing
any state. any state.
""" """
next = request.REQUEST.get('next', None) next = request.REQUEST.get('next')
if not next: if not is_safe_url(url=next, host=request.get_host()):
next = request.META.get('HTTP_REFERER', None) next = request.META.get('HTTP_REFERER')
if not next: if not is_safe_url(url=next, host=request.get_host()):
next = '/' next = '/'
response = http.HttpResponseRedirect(next) response = http.HttpResponseRedirect(next)
if request.method == 'POST': if request.method == 'POST':
lang_code = request.POST.get('language', None) lang_code = request.POST.get('language', None)

View File

@ -217,6 +217,13 @@ class CommentViewTests(CommentTestCase):
match = re.search(r"^http://testserver/somewhere/else/\?c=\d+$", location) match = re.search(r"^http://testserver/somewhere/else/\?c=\d+$", location)
self.assertTrue(match != None, "Unexpected redirect location: %s" % location) self.assertTrue(match != None, "Unexpected redirect location: %s" % location)
data["next"] = "http://badserver/somewhere/else/"
data["comment"] = "This is another comment with an unsafe next url"
response = self.client.post("/post/", data)
location = response["Location"]
match = post_redirect_re.match(location)
self.assertTrue(match != None, "Unsafe redirection to: %s" % location)
def testCommentDoneView(self): def testCommentDoneView(self):
a = Article.objects.get(pk=1) a = Article.objects.get(pk=1)
data = self.getValidData(a) data = self.getValidData(a)

View File

@ -27,6 +27,30 @@ class FlagViewTests(CommentTestCase):
self.assertEqual(c.flags.filter(flag=CommentFlag.SUGGEST_REMOVAL).count(), 1) self.assertEqual(c.flags.filter(flag=CommentFlag.SUGGEST_REMOVAL).count(), 1)
return c return c
def testFlagPostNext(self):
"""
POST the flag view, explicitly providing a next url.
"""
comments = self.createSomeComments()
pk = comments[0].pk
self.client.login(username="normaluser", password="normaluser")
response = self.client.post("/flag/%d/" % pk, {'next': "/go/here/"})
self.assertEqual(response["Location"],
"http://testserver/go/here/?c=1")
def testFlagPostUnsafeNext(self):
"""
POSTing to the flag view with an unsafe next url will ignore the
provided url when redirecting.
"""
comments = self.createSomeComments()
pk = comments[0].pk
self.client.login(username="normaluser", password="normaluser")
response = self.client.post("/flag/%d/" % pk,
{'next': "http://elsewhere/bad"})
self.assertEqual(response["Location"],
"http://testserver/flagged/?c=%d" % pk)
def testFlagPostTwice(self): def testFlagPostTwice(self):
"""Users don't get to flag comments more than once.""" """Users don't get to flag comments more than once."""
c = self.testFlagPost() c = self.testFlagPost()
@ -46,7 +70,7 @@ class FlagViewTests(CommentTestCase):
def testFlaggedView(self): def testFlaggedView(self):
comments = self.createSomeComments() comments = self.createSomeComments()
pk = comments[0].pk pk = comments[0].pk
response = self.client.get("/flagged/", data={"c":pk}) response = self.client.get("/flagged/", data={"c": pk})
self.assertTemplateUsed(response, "comments/flagged.html") self.assertTemplateUsed(response, "comments/flagged.html")
def testFlagSignals(self): def testFlagSignals(self):
@ -98,6 +122,33 @@ class DeleteViewTests(CommentTestCase):
self.assertTrue(c.is_removed) self.assertTrue(c.is_removed)
self.assertEqual(c.flags.filter(flag=CommentFlag.MODERATOR_DELETION, user__username="normaluser").count(), 1) self.assertEqual(c.flags.filter(flag=CommentFlag.MODERATOR_DELETION, user__username="normaluser").count(), 1)
def testDeletePostNext(self):
"""
POSTing the delete view will redirect to an explicitly provided a next
url.
"""
comments = self.createSomeComments()
pk = comments[0].pk
makeModerator("normaluser")
self.client.login(username="normaluser", password="normaluser")
response = self.client.post("/delete/%d/" % pk, {'next': "/go/here/"})
self.assertEqual(response["Location"],
"http://testserver/go/here/?c=1")
def testDeletePostUnsafeNext(self):
"""
POSTing to the delete view with an unsafe next url will ignore the
provided url when redirecting.
"""
comments = self.createSomeComments()
pk = comments[0].pk
makeModerator("normaluser")
self.client.login(username="normaluser", password="normaluser")
response = self.client.post("/delete/%d/" % pk,
{'next': "http://elsewhere/bad"})
self.assertEqual(response["Location"],
"http://testserver/deleted/?c=%d" % pk)
def testDeleteSignals(self): def testDeleteSignals(self):
def receive(sender, **kwargs): def receive(sender, **kwargs):
received_signals.append(kwargs.get('signal')) received_signals.append(kwargs.get('signal'))
@ -113,13 +164,13 @@ class DeleteViewTests(CommentTestCase):
def testDeletedView(self): def testDeletedView(self):
comments = self.createSomeComments() comments = self.createSomeComments()
pk = comments[0].pk pk = comments[0].pk
response = self.client.get("/deleted/", data={"c":pk}) response = self.client.get("/deleted/", data={"c": pk})
self.assertTemplateUsed(response, "comments/deleted.html") self.assertTemplateUsed(response, "comments/deleted.html")
class ApproveViewTests(CommentTestCase): class ApproveViewTests(CommentTestCase):
def testApprovePermissions(self): def testApprovePermissions(self):
"""The delete view should only be accessible to 'moderators'""" """The approve view should only be accessible to 'moderators'"""
comments = self.createSomeComments() comments = self.createSomeComments()
pk = comments[0].pk pk = comments[0].pk
self.client.login(username="normaluser", password="normaluser") self.client.login(username="normaluser", password="normaluser")
@ -131,7 +182,7 @@ class ApproveViewTests(CommentTestCase):
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
def testApprovePost(self): def testApprovePost(self):
"""POSTing the delete view should mark the comment as removed""" """POSTing the approve view should mark the comment as removed"""
c1, c2, c3, c4 = self.createSomeComments() c1, c2, c3, c4 = self.createSomeComments()
c1.is_public = False; c1.save() c1.is_public = False; c1.save()
@ -143,6 +194,36 @@ class ApproveViewTests(CommentTestCase):
self.assertTrue(c.is_public) self.assertTrue(c.is_public)
self.assertEqual(c.flags.filter(flag=CommentFlag.MODERATOR_APPROVAL, user__username="normaluser").count(), 1) self.assertEqual(c.flags.filter(flag=CommentFlag.MODERATOR_APPROVAL, user__username="normaluser").count(), 1)
def testApprovePostNext(self):
"""
POSTing the approve view will redirect to an explicitly provided a next
url.
"""
c1, c2, c3, c4 = self.createSomeComments()
c1.is_public = False; c1.save()
makeModerator("normaluser")
self.client.login(username="normaluser", password="normaluser")
response = self.client.post("/approve/%d/" % c1.pk,
{'next': "/go/here/"})
self.assertEqual(response["Location"],
"http://testserver/go/here/?c=1")
def testApprovePostUnsafeNext(self):
"""
POSTing to the approve view with an unsafe next url will ignore the
provided url when redirecting.
"""
c1, c2, c3, c4 = self.createSomeComments()
c1.is_public = False; c1.save()
makeModerator("normaluser")
self.client.login(username="normaluser", password="normaluser")
response = self.client.post("/approve/%d/" % c1.pk,
{'next': "http://elsewhere/bad"})
self.assertEqual(response["Location"],
"http://testserver/approved/?c=%d" % c1.pk)
def testApproveSignals(self): def testApproveSignals(self):
def receive(sender, **kwargs): def receive(sender, **kwargs):
received_signals.append(kwargs.get('signal')) received_signals.append(kwargs.get('signal'))

View File

@ -13,13 +13,28 @@ class I18NTests(TestCase):
""" Tests django views in django/views/i18n.py """ """ Tests django views in django/views/i18n.py """
def test_setlang(self): def test_setlang(self):
"""The set_language view can be used to change the session language""" """
The set_language view can be used to change the session language.
The user is redirected to the 'next' argument if provided.
"""
for lang_code, lang_name in settings.LANGUAGES: for lang_code, lang_name in settings.LANGUAGES:
post_data = dict(language=lang_code, next='/views/') post_data = dict(language=lang_code, next='/views/')
response = self.client.post('/views/i18n/setlang/', data=post_data) response = self.client.post('/views/i18n/setlang/', data=post_data)
self.assertRedirects(response, 'http://testserver/views/') self.assertRedirects(response, 'http://testserver/views/')
self.assertEqual(self.client.session['django_language'], lang_code) self.assertEqual(self.client.session['django_language'], lang_code)
def test_setlang_unsafe_next(self):
"""
The set_language view only redirects to the 'next' argument if it is
"safe".
"""
lang_code, lang_name = settings.LANGUAGES[0]
post_data = dict(language=lang_code, next='//unsafe/redirection/')
response = self.client.post('/views/i18n/setlang/', data=post_data)
self.assertEqual(response['Location'], 'http://testserver/')
self.assertEqual(self.client.session['django_language'], lang_code)
def test_jsi18n(self): def test_jsi18n(self):
"""The javascript_catalog can be deployed with language settings""" """The javascript_catalog can be deployed with language settings"""
for lang_code in ['es', 'fr', 'ru']: for lang_code in ['es', 'fr', 'ru']: