Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 9f53443188 |
+1
-2
@@ -1,11 +1,10 @@
|
||||
language: python
|
||||
python:
|
||||
- "2.6"
|
||||
- "2.7"
|
||||
|
||||
install:
|
||||
- "pip install -r requirements.dev.txt"
|
||||
- "[[ $TRAVIS_PYTHON_VERSION = '2.6' ]] && pip install unittest2 || echo"
|
||||
- "pip install ."
|
||||
|
||||
|
||||
script: nosetests
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
pydiscourse
|
||||
------------
|
||||
A Python library for working with Discourse.
|
||||
|
||||
A Python library for the Discourse API.
|
||||
Its pretty basic right now but you need to start somewhere.
|
||||
|
||||
Examples
|
||||
@@ -23,16 +22,6 @@ Create a new user::
|
||||
|
||||
user = client.create_user('The Black Knight', 'blacknight', 'knight@python.org', 'justafleshwound')
|
||||
|
||||
Implement SSO for Discourse with your Python server::
|
||||
|
||||
@login_required
|
||||
def discourse_sso_view(request):
|
||||
payload = request.GET.get('sso')
|
||||
signature = request.GET.get('sig')
|
||||
nonce = sso_validate(payload, signature, SECRET)
|
||||
url = sso_redirect_url(nonce, SECRET, request.user.email, request.user.id, request.user.username)
|
||||
return redirect('http://discuss.example.com' + url)
|
||||
|
||||
Command line
|
||||
----------------
|
||||
|
||||
|
||||
@@ -0,0 +1,160 @@
|
||||
""" A higher level API wrapper for communicating with a Discourse instance
|
||||
|
||||
EXPERIMENTAL, subject to complete and radical change
|
||||
|
||||
Goal
|
||||
------
|
||||
A pythonic wrapper around the discourse API that minimizes requests by lazy loading of data.
|
||||
|
||||
"""
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def datetime_from(date):
|
||||
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
|
||||
# XXX not handling timezone, but we're not using it yet
|
||||
return datetime.strptime(date[:-6], DATE_FORMAT)
|
||||
|
||||
|
||||
class DiscourseUser(object):
|
||||
username = None
|
||||
userid = None
|
||||
avatar_template = None
|
||||
default_avatar_size = 40
|
||||
|
||||
def avatar(self, size=None):
|
||||
if size is None:
|
||||
size = self.default_avatar_size
|
||||
return self.avatar_template.replace(u'{size}', unicode(size))
|
||||
|
||||
def __repr__(self):
|
||||
return '<DiscourseUser {0} {1}>'.format(self.userid, self.username)
|
||||
|
||||
@classmethod
|
||||
def from_summary(cls, summary):
|
||||
instance = cls()
|
||||
instance.username = summary['username']
|
||||
instance.userid = summary['id']
|
||||
instance.avatar_template = summary['avatar_template']
|
||||
|
||||
return instance
|
||||
|
||||
|
||||
class DiscourseUserSet(object):
|
||||
def __init__(self, users):
|
||||
self.users = users
|
||||
|
||||
self.byname = {u.username: u for u in users}
|
||||
self.byid = {u.userid: u for u in users}
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.users)
|
||||
|
||||
def __getitem__(self, item):
|
||||
try:
|
||||
return self.byname[item]
|
||||
except KeyError:
|
||||
return self.byid[item]
|
||||
|
||||
@classmethod
|
||||
def from_response(cls, response):
|
||||
users = [DiscourseUser.from_summary(u) for u in response.get('users', [])]
|
||||
return cls(users)
|
||||
|
||||
|
||||
class DiscoursePost(object):
|
||||
default_avatar_size = 40
|
||||
|
||||
def avatar(self, size=None):
|
||||
if size is None:
|
||||
size = self.default_avatar_size
|
||||
return self.avatar_template.replace(u'{size}', unicode(size))
|
||||
|
||||
def date_created(self):
|
||||
return datetime_from(self.data['created_at'])
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data):
|
||||
instance = cls()
|
||||
instance.data = data
|
||||
|
||||
return instance
|
||||
|
||||
def __getattr__(self, attr):
|
||||
return self.data[attr]
|
||||
|
||||
|
||||
class DiscourseTopic(object):
|
||||
def __init__(self):
|
||||
self.posts = None
|
||||
self.data = None
|
||||
self.raw_response = None
|
||||
|
||||
def created_by(self):
|
||||
return DiscourseUser.from_summary(self.data['details']['created_by'])
|
||||
|
||||
def created_at(self):
|
||||
return datetime_from(self.data['created_at'])
|
||||
|
||||
def last_posted_at(self):
|
||||
return datetime_from(self.data['last_posted_at'])
|
||||
|
||||
def num_unread(self):
|
||||
if self.data['unseen']:
|
||||
return 1
|
||||
|
||||
return self.data['new_posts']
|
||||
|
||||
def participants(self):
|
||||
return [DiscourseUser.from_summary(u) for u in self.data['participants']]
|
||||
|
||||
@classmethod
|
||||
def from_response(cls, response):
|
||||
instance = DiscourseTopic.from_dict(response)
|
||||
instance.raw_response = response
|
||||
instance.posts = []
|
||||
for post in response['post_stream']['posts']:
|
||||
instance.posts.append(DiscoursePost.from_dict(post))
|
||||
|
||||
return instance
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data):
|
||||
instance = cls()
|
||||
instance.data = data
|
||||
return instance
|
||||
|
||||
def fetch_remaining_posts(self, discourse):
|
||||
""" The initial topic response is paginated, this makes another request to get additional posts
|
||||
"""
|
||||
if self.data['posts_count'] > len(self.posts):
|
||||
missing = self.data['post_stream']['stream'][len(self.posts):]
|
||||
response = discourse.posts(self.id, missing)
|
||||
for post in response['post_stream']['posts']:
|
||||
self.posts.append(DiscoursePost.from_dict(post))
|
||||
|
||||
def __getattr__(self, attr):
|
||||
return self.data[attr]
|
||||
|
||||
def __repr__(self):
|
||||
return u'<Topic {0}>'.format(self.title)
|
||||
|
||||
|
||||
class DiscourseTopicSet(object):
|
||||
def __init__(self, topics):
|
||||
self.topics = topics
|
||||
|
||||
def all(self):
|
||||
return self.topics
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.topics)
|
||||
|
||||
def __len__(self):
|
||||
return len(self.topics)
|
||||
|
||||
@classmethod
|
||||
def from_response(cls, response):
|
||||
topics = response.get('topic_list', {}).get('topics', [])
|
||||
topics = [DiscourseTopic.from_dict(t) for t in topics]
|
||||
return cls(topics)
|
||||
@@ -60,12 +60,6 @@ class DiscourseClient(object):
|
||||
def update_username(self, username, new_username, **kwargs):
|
||||
return self._put('/users/{0}/preferences/username'.format(username), username=new_username, **kwargs)
|
||||
|
||||
def set_preference(self, username=None, **kwargs):
|
||||
if username is None:
|
||||
username = self.api_username
|
||||
|
||||
return self._put(u'/users/{0}'.format(username), **kwargs)
|
||||
|
||||
def generate_api_key(self, userid, **kwargs):
|
||||
return self._post('/admin/users/{0}/generate_api_key'.format(userid), **kwargs)
|
||||
|
||||
@@ -129,11 +123,6 @@ class DiscourseClient(object):
|
||||
"""
|
||||
return self._post('/posts', raw=content, **kwargs)
|
||||
|
||||
def update_post(self, post_id, content, edit_reason='', **kwargs):
|
||||
kwargs['post[raw]'] = content
|
||||
kwargs['post[edit_reason'] = edit_reason
|
||||
return self._put('/posts/{0}'.format(post_id), **kwargs)
|
||||
|
||||
def topics_by(self, username, **kwargs):
|
||||
url = '/topics/created-by/{0}.json'.format(username)
|
||||
return self._get(url, **kwargs)['topic_list']['topics']
|
||||
@@ -149,42 +138,9 @@ class DiscourseClient(object):
|
||||
kwargs['term'] = term
|
||||
return self._get('/search.json', **kwargs)
|
||||
|
||||
def create_category(self, name, color, text_color='FFFFFF', permissions=None, parent=None, **kwargs):
|
||||
""" permissions - dict of 'everyone', 'admins', 'moderators', 'staff' with values of
|
||||
"""
|
||||
|
||||
kwargs['name'] = name
|
||||
kwargs['color'] = color
|
||||
kwargs['text_color'] = text_color
|
||||
|
||||
if permissions is None and 'permissions' not in kwargs:
|
||||
permissions = {'everyone': '1'}
|
||||
|
||||
for key, value in permissions.items():
|
||||
kwargs['permissions[{0}]'.format(key)] = value
|
||||
|
||||
if parent:
|
||||
parent_id = None
|
||||
for category in self.categories():
|
||||
if category['name'] == parent:
|
||||
parent_id = category['id']
|
||||
continue
|
||||
|
||||
if not parent_id:
|
||||
raise DiscourseClientError(u'{0} not found'.format(parent))
|
||||
kwargs['parent_category_id'] = parent_id
|
||||
|
||||
return self._post('/categories', **kwargs)
|
||||
|
||||
def categories(self, **kwargs):
|
||||
return self._get('/categories.json', **kwargs)['category_list']['categories']
|
||||
|
||||
def category(self, name, parent=None, **kwargs):
|
||||
if parent:
|
||||
name = u'{0}/{1}'.format(parent, name)
|
||||
|
||||
return self._get(u'/category/{0}.json'.format(name), **kwargs)
|
||||
|
||||
def site_settings(self, **kwargs):
|
||||
for setting, value in kwargs.items():
|
||||
setting = setting.replace(' ', '_')
|
||||
|
||||
@@ -1,89 +0,0 @@
|
||||
"""
|
||||
Utilities to implement Single Sign On for Discourse with a Python managed authentication DB
|
||||
|
||||
https://meta.discourse.org/t/official-single-sign-on-for-discourse/13045
|
||||
|
||||
Thanks to James Potter for the heavy lifting, detailed at https://meta.discourse.org/t/sso-example-for-django/14258
|
||||
|
||||
A SSO request handler might look something like
|
||||
|
||||
@login_required
|
||||
def discourse_sso_view(request):
|
||||
payload = request.GET.get('sso')
|
||||
signature = request.GET.get('sig')
|
||||
try:
|
||||
nonce = sso_validate(payload, signature, SECRET)
|
||||
except DiscourseError as e:
|
||||
return HTTP400(e.args[0])
|
||||
|
||||
url = sso_redirect_url(nonce, SECRET, request.user.email, request.user.id, request.user.username)
|
||||
return redirect('http://discuss.example.com' + url)
|
||||
"""
|
||||
import base64
|
||||
import hmac
|
||||
import hashlib
|
||||
|
||||
try: # py3
|
||||
from urllib.parse import unquote, urlencode
|
||||
except ImportError:
|
||||
from urllib import unquote, urlencode
|
||||
|
||||
|
||||
from pydiscourse.exceptions import DiscourseError
|
||||
|
||||
|
||||
def sso_validate(payload, signature, secret):
|
||||
"""
|
||||
payload: provided by Discourse HTTP call to your SSO endpoint as sso GET param
|
||||
signature: provided by Discourse HTTP call to your SSO endpoint as sig GET param
|
||||
secret: the secret key you entered into Discourse sso secret
|
||||
|
||||
return value: The nonce used by discourse to validate the redirect URL
|
||||
"""
|
||||
if None in [payload, signature]:
|
||||
raise DiscourseError('No SSO payload or signature.')
|
||||
|
||||
if not secret:
|
||||
raise DiscourseError('Invalid secret..')
|
||||
|
||||
payload = unquote(payload)
|
||||
if not payload:
|
||||
raise DiscourseError('Invalid payload..')
|
||||
|
||||
decoded = base64.decodestring(payload)
|
||||
if 'nonce' not in decoded:
|
||||
raise DiscourseError('Invalid payload..')
|
||||
|
||||
h = hmac.new(secret, payload, digestmod=hashlib.sha256)
|
||||
this_signature = h.hexdigest()
|
||||
|
||||
if this_signature != signature:
|
||||
raise DiscourseError('Payload does not match signature.')
|
||||
|
||||
nonce = decoded.split('=')[1]
|
||||
|
||||
return nonce
|
||||
|
||||
|
||||
def sso_redirect_url(nonce, secret, email, external_id, username, **kwargs):
|
||||
"""
|
||||
nonce: returned by sso_validate()
|
||||
secret: the secret key you entered into Discourse sso secret
|
||||
user_email: email address of the user who logged in
|
||||
user_id: the internal id of the logged in user
|
||||
user_username: username of the logged in user
|
||||
|
||||
return value: URL to redirect users back to discourse, now logged in as user_username
|
||||
"""
|
||||
kwargs.update({
|
||||
'nonce': nonce,
|
||||
'email': email,
|
||||
'external_id': external_id,
|
||||
'username': username
|
||||
})
|
||||
|
||||
return_payload = base64.encodestring(urlencode(kwargs))
|
||||
h = hmac.new(secret, return_payload, digestmod=hashlib.sha256)
|
||||
query_string = urlencode({'sso': return_payload, 'sig': h.hexdigest()})
|
||||
|
||||
return '/session/sso_login?%s' % query_string
|
||||
@@ -1,3 +1,3 @@
|
||||
-r requirements.txt
|
||||
requests
|
||||
nose
|
||||
mock
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
requests
|
||||
@@ -5,7 +5,7 @@ from setuptools import setup, find_packages
|
||||
|
||||
|
||||
def read(fname):
|
||||
return codecs.open(os.path.join(os.path.dirname(__file__), fname), 'rt').read()
|
||||
return codecs.open(os.path.join(os.path.dirname(__file__), fname)).read()
|
||||
|
||||
|
||||
# Provided as an attribute, so you can append to these instead
|
||||
@@ -47,8 +47,6 @@ setup(
|
||||
"License :: OSI Approved :: MIT License",
|
||||
"Operating System :: OS Independent",
|
||||
"Programming Language :: Python",
|
||||
"Programming Language :: Python :: 2.6",
|
||||
"Programming Language :: Python :: 2.7",
|
||||
],
|
||||
zip_safe=False,
|
||||
)
|
||||
|
||||
@@ -1,78 +0,0 @@
|
||||
import base64
|
||||
|
||||
try: # py26
|
||||
import unittest2 as unittest
|
||||
except ImportError:
|
||||
import unittest
|
||||
|
||||
|
||||
try: # py3
|
||||
from urllib.parse import unquote
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
except ImportError:
|
||||
from urlparse import urlparse, parse_qs
|
||||
from urllib import unquote
|
||||
|
||||
|
||||
from pydiscourse import sso
|
||||
from pydiscourse.exceptions import DiscourseError
|
||||
|
||||
|
||||
class SSOTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# values from https://meta.discourse.org/t/official-single-sign-on-for-discourse/13045
|
||||
self.secret = 'd836444a9e4084d5b224a60c208dce14'
|
||||
self.nonce = 'cb68251eefb5211e58c00ff1395f0c0b'
|
||||
self.payload = 'bm9uY2U9Y2I2ODI1MWVlZmI1MjExZTU4YzAwZmYxMzk1ZjBjMGI%3D%0A'
|
||||
self.signature = '2828aa29899722b35a2f191d34ef9b3ce695e0e6eeec47deb46d588d70c7cb56'
|
||||
|
||||
self.name = u'sam'
|
||||
self.username = u'samsam'
|
||||
self.external_id = u'hello123'
|
||||
self.email = u'test@test.com'
|
||||
self.redirect_url = u'/session/sso_login?sso=bm9uY2U9Y2I2ODI1MWVlZmI1MjExZTU4YzAwZmYxMzk1ZjBjMGImbmFtZT1z%0AYW0mdXNlcm5hbWU9c2Ftc2FtJmVtYWlsPXRlc3QlNDB0ZXN0LmNvbSZleHRl%0Acm5hbF9pZD1oZWxsbzEyMw%3D%3D%0A&sig=1c884222282f3feacd76802a9dd94e8bc8deba5d619b292bed75d63eb3152c0b'
|
||||
|
||||
|
||||
class Test_sso_validate(SSOTestCase):
|
||||
def test_missing_args(self):
|
||||
with self.assertRaises(DiscourseError):
|
||||
sso.sso_validate(None, self.signature, self.secret)
|
||||
|
||||
with self.assertRaises(DiscourseError):
|
||||
sso.sso_validate('', self.signature, self.secret)
|
||||
|
||||
with self.assertRaises(DiscourseError):
|
||||
sso.sso_validate(self.payload, None, self.secret)
|
||||
|
||||
def test_invalid_signature(self):
|
||||
with self.assertRaises(DiscourseError):
|
||||
sso.sso_validate(self.payload, 'notavalidsignature', self.secret)
|
||||
|
||||
def test_valid_nonce(self):
|
||||
nonce = sso.sso_validate(self.payload, self.signature, self.secret)
|
||||
self.assertEqual(nonce, self.nonce)
|
||||
|
||||
|
||||
class Test_sso_redirect_url(SSOTestCase):
|
||||
def test_valid_redirect_url(self):
|
||||
url = sso.sso_redirect_url(self.nonce, self.secret, self.email, self.external_id, self.username, name='sam')
|
||||
|
||||
self.assertIn('/session/sso_login', url[:20])
|
||||
|
||||
# check its valid, using our own handy validator
|
||||
params = parse_qs(urlparse(url).query)
|
||||
payload = params['sso'][0]
|
||||
sso.sso_validate(payload, params['sig'][0], self.secret)
|
||||
|
||||
# check the params have all the data we expect
|
||||
payload = base64.decodestring(payload)
|
||||
payload = unquote(payload)
|
||||
payload = dict((p.split('=') for p in payload.split('&')))
|
||||
|
||||
self.assertEqual(payload, {
|
||||
'username': self.username,
|
||||
'nonce': self.nonce,
|
||||
'external_id': self.external_id,
|
||||
'name': self.name,
|
||||
'email': self.email
|
||||
})
|
||||
Reference in New Issue
Block a user