Commit d6aebe2c authored by Rob Carleski's avatar Rob Carleski 🇮🇸
Browse files

V2

parent 67aefed3
......@@ -3,3 +3,4 @@
hacking_dir=$(readlink -fn $(dirname "$BASH_SOURCE"))
VIRTUAL_ENV_DISABLE_PROMPT=1
. $hacking_dir/venv/bin/activate
PATH=$hacking_dir/mcommunity/:$PATH
from .mcommunity import Client
from .mcommunity import MCommClient
from .core import MCommError
from urllib.parse import quote
def validate_name(client, name):
"""Validate a given name against the MCommunity API
Parameters
----------
client : obj
An instance of mcommunity.MCommClient
name : str
The name to validate
Returns
-------
boolean
true or false, depending on validity status.
"""
r = client.get('/isValidName/{}'.format(name)).json()
if r['valid']:
return True
elif not r['valid']:
raise MCommError(r['error'][0]['message'])
def get_entity_dn(client, name):
"""Fetch the DN for a specified entity in MCommunity
Parameters
---------
client : obj
An instance of mcommunity.MCommClient
name : str
The name of the target entity
Returns
-------
str
The DN of the given entity
"""
if '=' in name or '@' in name:
return name
data = client.get('/find/both/{}'.format(name)).json()
if data:
for item in data:
if item['person']:
if item['naming'].casefold() == name.casefold():
return item['dn']
elif item['group']:
if item['displayName'].casefold() == name:
return item['dn']
else:
r = client.get('/profile/dn/{}'.format(
quote(item['dn'])
))
if r.ok:
group = r.json()['group'][0]
aliases = group['aliases']
aliases.append(group['name'])
if name in [x.casefold() for x in aliases]:
return item['dn']
class MCommError(Exception):
pass
This diff is collapsed.
......@@ -29,7 +29,7 @@ setup(
packages=find_packages(),
install_requires=[
'requests',
'ldap3',
'ldap3'
],
setup_requires=[
'pytest-runner',
......
import os
import subprocess
import pytest
pytest_plugins = "pep8"
@pytest.fixture(scope="session")
def mock_mcomm():
p = subprocess.Popen(['python3', os.path.join(os.path.dirname(__file__), 'mock_mcomm.py')])
yield p
p.kill()
#!/usr/bin/env python
import json
import os
from flask import Flask
app = Flask(__name__)
def get_testdata(base, name):
fname = os.path.join(os.path.dirname(__file__), 'data/{}_{}.json'.format(base, name))
if os.path.exists(fname):
with open(fname, 'r') as f:
return f.read()
else:
return '{}'
@app.route("/iamGroups/create", methods=['POST'])
@app.route("/iamGroups/delete/<dn>", methods=['GET'])
@app.route("/iamGroups/renew/<dn>", methods=['GET'])
@app.route("/iamGroups/reserve", methods=['POST'])
@app.route("/iamGroups/update/<attrib>", methods=['POST'])
def return_success(attrib=False, dn=False):
return '{"status": "success"}'
@app.route("/iamGroups/profile/dn/<dn>", methods=['GET'])
def get_group(dn):
cn = dn.split(',')[0].split('=')[1]
return get_testdata('profile', cn)
@app.route("/iamGroups/find/both/<name>", methods=['GET'])
def get_both(name):
return get_testdata('find_both', name)
@app.route("/iamGroups/find/person/<uid>", methods=['GET'])
def get_user(uid):
return get_testdata('find_person', uid)
@app.route("/iamGroups/isValidName/<name>", methods=['GET'])
def is_valid_name(name):
if name != 'badname':
return '{"valid": "true"}'
else:
return '{"valid": "false"}'
@app.route("/inst/oauth2/token", methods=['POST'])
def return_token():
return '{"access_token": "a1b2c3d4c5d6e7f8g9h10i11j12k13l14"}'
def main():
app.run()
if __name__ == '__main__':
main()
import mcommunity
import json
import os
import pytest
from unittest import mock
from mcommunity.mcommunity import MCommClient
config = {
'client_id': '1234567890',
'secret': '123abc-456def-789ghi',
'url_base': 'http://localhost:5000'
}
data_dir = os.path.join(os.path.dirname(__file__) + '/data/')
def test_group_fetch(mock_mcomm):
conn = mcommunity.Client(config=config)
conn.fetch_group('testgroup')
assert conn.group_data['name'] == 'testgroup'
@pytest.fixture
def mcclient():
def _loader(filenames):
for fname in filenames:
with open(data_dir + fname, 'r') as f:
data = json.load(f)
yield data
def test_group_fetch_by_cn(mock_mcomm):
conn = mcommunity.Client(config=config)
conn.fetch_group('alias1')
assert conn.group_data['name'] == 'testgroup'
@mock.patch('mcommunity.mcommunity.MCommSession')
def _client(json_files, session):
session.return_value.get.return_value.json.side_effect = _loader(
json_files
)
session.return_value.token = '12345'
client = MCommClient(client_id='12345', secret='abdce')
return client
return _client
def test_person_fetch(mock_mcomm):
conn = mcommunity.Client(config=config)
person = conn.fetch_person('testuser')
assert person['naming'] == 'testuser'
def test_group_creation(mock_mcomm):
conn = mcommunity.Client(config=config)
conn.create_group('testgroup')
def test_group_reservation(mock_mcomm):
conn = mcommunity.Client(config=config)
conn.reserve_group('testgroup')
def test_group_deletion(mock_mcomm):
conn = mcommunity.Client(config=config)
r = conn.delete_group('testgroup')
assert r['status'] == 'success'
def test_group_renew(mock_mcomm):
conn = mcommunity.Client(config=config)
r = conn.renew_group('testgroup')
assert r['status'] == 'success'
def test_group_update_aliases(mock_mcomm):
conn = mcommunity.Client(config=config)
conn.fetch_group('testgroup')
conn.update_group_aliases('testalias')
assert 'testalias' in conn.group_data['aliases']
def test_group_update_description(mock_mcomm):
conn = mcommunity.Client(config=config)
conn.fetch_group('testgroup')
conn.update_group_description('test description')
assert conn.group_data['description'] == 'test description'
def test_group_update_notice(mock_mcomm):
conn = mcommunity.Client(config=config)
conn.fetch_group('testgroup')
conn.update_group_notice('test notice')
assert conn.group_data['notice'] == 'test notice'
def test_group_update_links_labeled(mock_mcomm):
conn = mcommunity.Client(config=config)
conn.fetch_group('testgroup')
links = ('Test Link', 'https://test.link')
conn.update_group_links(links)
assert conn.group_data['labeledUri'][0]['urlLabel'] == 'Test Link'
assert conn.group_data['labeledUri'][0]['urlValue'] == 'https://test.link'
def test_group_update_links_plain(mock_mcomm):
conn = mcommunity.Client(config=config)
conn.fetch_group('testgroup')
links = 'https://test.link'
conn.update_group_links(links)
assert conn.group_data['labeledUri'][0]['urlValue'] == 'https://test.link'
def test_group_owners_update(mock_mcomm):
conn = mcommunity.Client(config=config)
conn.fetch_group('testgroup')
conn.add_group_owners('testuser2')
conn.update_group_owners()
testuser2 = conn._create_entity_ldap('testuser2')
assert testuser2 in conn.group_data['ownerDn']
conn.remove_group_owners('testuser2')
conn.update_group_owners()
assert testuser2 not in conn.group_data['ownerDn']
def test_group_fetch(mcclient):
response_files = [
'find_both_testgroup.json',
'profile_testgroup.json'
]
group = mcclient(response_files).group('testgroup')
assert group.dn
def test_group_members_update(mock_mcomm):
conn = mcommunity.Client(config=config)
conn.fetch_group('testgroup')
members = [
'testuser2',
'testgroup2',
'test@domain.tld'
def test_group_fetch_by_cn(mcclient):
response_files = [
'find_both_alias1.json',
'profile_testgroup.json',
'profile_testgroup.json'
]
conn.add_group_members(members)
conn.update_group_members()
testuser2 = conn._create_entity_ldap('testuser2')
testgroup2 = conn._create_entity_ldap('testgroup2')
externalMember = 'test@domain.tld'
assert testuser2 in conn.group_data['memberDn']
assert testgroup2 in conn.group_data['memberGroupDn']
assert conn.group_data['memberExternal'][0]['email'] == externalMember
conn.remove_group_members(members)
conn.update_group_members()
assert testuser2 not in conn.group_data['memberDn']
assert testgroup2 not in conn.group_data['memberGroupDn']
assert not conn.group_data['memberExternal']
group = mcclient(response_files).group('alias1')
assert group.dn
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment