Commit 8ee880fa authored by Rob Carleski's avatar Rob Carleski 🇮🇸
Browse files

Let's go, mcommunity!

parents
__pycache__
*.pyc
bin/
---
image: python:3.7
stages:
- test
test:
stage: test
before_script:
- pip install -U pip
- pip install -I -r python-requirements.txt
- cd tests/
script:
- pytest -v
#!/bin/bash
virtualenv -p python36 --system-site-packages bin/python
. bin/python/bin/activate
pip install -U pip
pip install -I -r python-requirements.txt
from setuptools import setup, find_packages
setup(name='umich_mcomm',
version='1.00',
description='Library for interacting with MCommunity APIs',
url='https://gitlab.umich.edu/carleski/python-mcommunity.git',
author='Rob Carleski',
author_email='carleski@umich.edu',
license='MIT',
packages=find_packages(),
zip_safe=False)
import flask
import os
import pytest
import subprocess
@pytest.fixture(scope="session")
def mock_mcomm():
os.environ["FLASK_APP"] = 'mock_mcomm.py'
p = subprocess.Popen(['flask', 'run'])
yield p
p.kill()
#!/usr/bin/env python
import json
from flask import Flask
app = Flask(__name__)
@app.route("/iamGroups/create", methods=['POST'])
@app.route("/iamGroups/delete/<dn>", methods=['GET'])
@app.route("/iamGroups/renew/<dn>", methods=['GET'])
@app.route("/iamGroups/reserve", methods=['POST'])
@app.route("/iamGroups/update/<attrib>", methods=['POST'])
def return_success(attrib=False, dn=False):
return '{"status": "success"}'
@app.route("/iamGroups/profile/dn/<dn>", methods=['GET'])
def get_group(dn):
return '{"group":[{"objectClass":["top","groupofnames","umichgroup","rfc822mailgroup","umichexpire","dirxml-entitlementrecipient","posixgroup","ndsloginproperties"],"dn":"cn=testgroup,ou=user groups,ou=groups,dc=umich,dc=edu","name":"testgroup","description":null,"aliases":["alias1","alias2","alias3"],"memberDn":["uid=testuser,ou=people,dc=umich,dc=edu"],"ownerDn":["uid=testuser,ou=people,dc=umich,dc=edu","cn=controller-group,ou=user groups,ou=groups,dc=umich,dc=edu"],"owners":[{"dn":"uid=testuser,ou=people,dc=umich,dc=edu","naming":"testuser","displayName":"Test Usersson","email":"testuser@umich.edu","displayTitle":"Application Operations System Administrator Senior","title":["Application Operations System Administrator Senior"],"affiliation":["ITS Infra Sys Application Ops - Faculty and Staff","ITS Infrastrc - Systems - Faculty and Staff"],"description":null,"member":false,"owner":false,"moderator":false,"person":true,"group":false,"external":false,"securityEquals":null,"groupMembership":null,"role":null}],"expiredDate":"02/05/2020","email":"testgroup","isSpamBlocked":true,"isEmailWarningSuppressed":true,"isJoinable":false,"externalSystems":["Box","Google"],"isPrivate":true,"isEmailableByMembersOnly":false,"xmlAssociations":null,"moderatorRaw":null,"moderator":null,"memberGroupDn":null,"groupMemberDnRaw":null,"memberExternalRaw":null,"memberExternal":null,"renewAuthority":null,"disabled":false,"disabledBy":null,"disabledMessage":null,"disabledDate":null,"purgeDate":null,"notice":null,"acl":null,"equivalentToMe":null,"descriptionLevel":"PUBLIC","noticeLevel":"PUBLIC","urlLevel":"PUBLIC","errorsTo":null,"errorsToExternalRaw":null,"errorsToExternal":null,"requestTo":null,"requestToExternalRaw":null,"requestToExternal":null,"gidNumber":"2280774","editable":false,"urlLinks":null,"labeledUri":null,"authenticatedUserRole":null,"authenticatedUserRoles":null,"renewable":true,"moderated":false}],"error":null}'
@app.route("/iamGroups/find/both/<name>", methods=['GET'])
def get_both(name):
if name == 'testuser':
return '[{"dn":"uid=testuser,ou=People,dc=umich,dc=edu","naming":"testuser","displayName":"Test Usersson","email":"testuser@umich.edu","displayTitle":"Application Operations System Administrator Senior","title":["Application Operations System Administrator Senior"],"affiliation":["ITS Infra Sys Application Ops - Faculty and Staff","ITS Infrastrc - Systems - Faculty and Staff"],"description":null,"member":false,"owner":false,"moderator":false,"person":true,"group":false,"external":false,"securityEquals":null,"groupMembership":null,"role":null},{"dn":"cn=testuser-group,ou=User Groups,ou=Groups,dc=umich,dc=edu","naming":null,"displayName":"testuser-group","email":null,"displayTitle":null,"title":null,"affiliation":null,"description":null,"member":false,"owner":false,"moderator":false,"person":false,"group":true,"external":false,"securityEquals":null,"groupMembership":null,"role":null},{"dn":"cn=testuser-dev,ou=User Groups,ou=Groups,dc=umich,dc=edu","naming":null,"displayName":"testuser-dev","email":null,"displayTitle":null,"title":null,"affiliation":null,"description":null,"member":false,"owner":false,"moderator":false,"person":false,"group":true,"external":false,"securityEquals":null,"groupMembership":null,"role":null},{"dn":"cn=test-testuser,ou=User Groups,ou=Groups,dc=umich,dc=edu","naming":null,"displayName":"test-testuser","email":null,"displayTitle":null,"title":null,"affiliation":null,"description":null,"member":false,"owner":false,"moderator":false,"person":false,"group":true,"external":false,"securityEquals":null,"groupMembership":null,"role":null}]'
elif name == 'testuser2':
return '[{"dn":"uid=testuser2,ou=People,dc=umich,dc=edu","naming":"testuser2","displayName":"Test Usersson Jr.","email":"testuser2@umich.edu","displayTitle":"Application Operations System Administrator Senior","title":["Application Operations System Administrator Senior"],"affiliation":["ITS Infra Sys Application Ops - Faculty and Staff","ITS Infrastrc - Systems - Faculty and Staff"],"description":null,"member":false,"owner":false,"moderator":false,"person":true,"group":false,"external":false,"securityEquals":null,"groupMembership":null,"role":null}]'
elif name =='testgroup':
return '[{"dn":"cn=testgroup,ou=User Groups,ou=Groups,dc=umich,dc=edu","naming":null,"displayName":"testgroup","email":null,"displayTitle":null,"title":null,"affiliation":null,"description":null,"member":false,"owner":false,"moderator":false,"person":false,"group":true,"external":false,"securityEquals":null,"groupMembership":null,"role":null}]'
elif name =='testgroup2':
return '[{"dn":"cn=testgroup2,ou=User Groups,ou=Groups,dc=umich,dc=edu","naming":null,"displayName":"testgroup2","email":null,"displayTitle":null,"title":null,"affiliation":null,"description":null,"member":false,"owner":false,"moderator":false,"person":false,"group":true,"external":false,"securityEquals":null,"groupMembership":null,"role":null}]'
else:
return {}
@app.route("/iamGroups/find/person/<uid>", methods=['GET'])
def get_user(uid):
return '{"dn":"uid=testuser,ou=People,dc=umich,dc=edu","naming":"testuser","displayName":"Test Usersson","email":"testuser@umich.edu","displayTitle":"Application Operations System Administrator Senior","title":["Application Operations System Administrator Senior"],"affiliation":["ITS Infra Sys Application Ops - Faculty and Staff","ITS Infrastrc - Systems - Faculty and Staff"],"description":null,"member":false,"owner":false,"moderator":false,"person":true,"group":false,"external":false,"securityEquals":null,"groupMembership":null,"role":null}'
@app.route("/iamGroups/isValidName/<name>", methods=['GET'])
def is_valid_name(name):
if name != 'badname':
return '{"valid": "true"}'
else:
return '{"valid": "false"}'
@app.route("/inst/oauth2/token", methods=['POST'])
def return_token():
return '{"access_token": "a1b2c3d4c5d6e7f8g9h10i11j12k13l14"}'
from umich_mcomm import mcommunity
config = {
'client_id': '1234567890',
'secret': '123abc-456def-789ghi',
'url_base': 'http://localhost:5000'
}
def test_group_fetch(mock_mcomm):
conn = mcommunity.Mcommunity(config=config)
conn.fetch_group('testgroup')
assert conn.group_data['name'] == 'testgroup'
def test_person_fetch(mock_mcomm):
conn = mcommunity.Mcommunity(config=config)
person = conn.fetch_person('testuser')
assert person['naming'] == 'testuser'
def test_group_creation(mock_mcomm):
conn = mcommunity.Mcommunity(config=config)
conn.create_group('testgroup')
assert conn.group_data['name'] == 'testgroup'
def test_group_deletion(mock_mcomm):
conn = mcommunity.Mcommunity(config=config)
r = conn.delete_group('testgroup')
assert r['status'] == 'success'
def test_group_renew(mock_mcomm):
conn = mcommunity.Mcommunity(config=config)
r = conn.renew_group('testgroup')
assert r['status'] == 'success'
def test_group_update_aliases(mock_mcomm):
conn = mcommunity.Mcommunity(config=config)
conn.fetch_group('testgroup')
conn.update_group_aliases('testalias')
assert 'testalias' in conn.group_data['aliases']
def test_group_update_description(mock_mcomm):
conn = mcommunity.Mcommunity(config=config)
conn.fetch_group('testgroup')
conn.update_group_description('test description')
assert conn.group_data['description'] == 'test description'
def test_group_update_notice(mock_mcomm):
conn = mcommunity.Mcommunity(config=config)
conn.fetch_group('testgroup')
conn.update_group_notice('test notice')
assert conn.group_data['notice'] == 'test notice'
def test_group_update_links_labeled(mock_mcomm):
conn = mcommunity.Mcommunity(config=config)
conn.fetch_group('testgroup')
links = ('Test Link', 'https://test.link')
conn.update_group_links(links)
assert conn.group_data['labeledUri'][0]['urlLabel'] == 'Test Link'
assert conn.group_data['labeledUri'][0]['urlValue'] == 'https://test.link'
def test_group_update_links_plain(mock_mcomm):
conn = mcommunity.Mcommunity(config=config)
conn.fetch_group('testgroup')
links = 'https://test.link'
conn.update_group_links(links)
assert conn.group_data['labeledUri'][0]['urlValue'] == 'https://test.link'
def test_group_owners_update(mock_mcomm):
conn = mcommunity.Mcommunity(config=config)
conn.fetch_group('testgroup')
conn.add_group_owners('testuser2')
testuser2 = conn._create_entity_ldap('testuser2')
assert testuser2 in conn.group_data['ownerDn']
conn.remove_group_owners('testuser2')
assert testuser2 not in conn.group_data['ownerDn']
def test_group_members_update(mock_mcomm):
conn = mcommunity.Mcommunity(config=config)
conn.fetch_group('testgroup')
members = [
'testuser2',
'testgroup2',
'test@domain.tld'
]
conn.add_group_members(members)
testuser2 = conn._create_entity_ldap('testuser2')
testgroup2 = conn._create_entity_ldap('testgroup2')
externalMember = 'test@domain.tld'
assert testuser2 in conn.group_data['memberDn']
assert testgroup2 in conn.group_data['memberGroupDn']
assert conn.group_data['memberExternal'][0]['email'] == externalMember
conn.remove_group_members(members)
assert testuser2 not in conn.group_data['memberDn']
assert testgroup2 not in conn.group_data['memberGroupDn']
assert not conn.group_data['memberExternal']
import requests
import json
from urllib.parse import quote
from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
class Mcommunity:
def __init__(self, client_id='', secret='', config=False):
self.client_id = client_id
self.secret = secret
self.url_base = 'https://apigw.it.umich.edu/um'
self.timeout = 10
self.port = 80
if isinstance(config, dict):
self.__dict__.update(config)
self.scope = 'iamgroups'
self.token_url = self.url_base + '/inst/oauth2/token'
self.call_url = self.url_base + '/iamGroups'
self.session = requests.Session()
retries = Retry(
total=5,
backoff_factor=0.2,
status_forcelist=[500, 502, 503, 504]
)
self.session.mount('http://', HTTPAdapter(max_retries=retries))
try:
self._request_token()
except KeyError:
raise KeyError('Unable to get access token from API')
self.headers = {
'x-ibm-client-id': '{}'.format(self.client_id),
'authorization': 'Bearer {}'.format(self.token),
'accept': 'application/json'
}
def _request_token(self):
data = {
'grant_type': 'client_credentials',
'scope': 'constituents'
}
headers = {
'Content-type': 'application/x-www-form-urlencoded',
'accept': 'application/json'
}
url_append = '?grant_type=client_credentials&scope={}'.format(
self.scope
)
url = self.token_url + url_append
r = self.session.post(
url,
data=json.dumps(data),
headers=headers,
auth=(self.client_id, self.secret),
timeout=self.timeout
)
self.token = r.json()['access_token']
def _validate_name(self, name):
"""Validate a given name against MCommunity standards
Parameters
----------
name : str
The name to validate
Returns
-------
boolean
true or false, depending on validity status.
"""
endpoint = self.call_url + '/isValidName/{}'.format(name)
r = self.session.get(
url=endpoint,
headers=self.headers,
timeout=self.timeout
)
if r.json()['valid']:
return True
else:
raise ValueError(r.json()['error'][0]['message'])
def _create_entity_ldap(self, name):
"""Create an LDAP string for a given object
Parameters
---------
name : str
The name of the object
Returns
-------
str
An LDAP string representation of the object.
"""
if '=' in name or '@' in name:
return name
endpoint = self.call_url + '/find/both/{}'.format(name)
r = self.session.get(
url=endpoint,
headers=self.headers,
timeout=self.timeout
)
if r.status_code == requests.codes.ok:
data = r.json()
if data[0]['person']:
return 'uid={},ou=people,dc=umich,dc=edu'.format(
name
)
elif data[0]['group']:
return data[0]['dn'].lower()
else:
raise ValueError('Entity found but unidentifiable.')
def _apply_update(self, endpoint):
"""Generic update function
Parameters
----------
endpoint : str
The API endpoint to POST data to
Returns
-------
obj
A Requests response object
"""
endpoint = self.call_url + endpoint
r = self.session.post(
url=endpoint,
data=json.dumps(self.group_data),
headers=self.headers,
timeout=self.timeout
)
if r.status_code == requests.codes.ok:
if r.json()['status'] == 'success':
return r
else:
raise Exception('{}: {}'.format(r.status_code, r.text))
def fetch_group(self, name):
"""Fetch information for an mcommunity group
Parameters
----------
name : str
The name of the mcommunity group to fetch data for
Returns
-------
None
Nothing returned; self.group_data is populated instead.
"""
dn = self._create_entity_ldap(name)
encoded_dn = quote(dn)
endpoint = self.call_url + '/profile/dn/{}'.format(encoded_dn)
r = self.session.get(
url=endpoint,
headers=self.headers,
timeout=self.timeout
)
if r.status_code == requests.codes.ok:
self.group_data = r.json()['group'][0]
else:
raise Exception('{}: {}'.format(r.status_code, r.text))
def fetch_person(self, name):
"""Fetch information about a user from mcommunity
Parameters
----------
name : str
The uniqname of the user to fetch data for
Returns
-------
dict
A dict of user information
"""
endpoint = self.call_url + '/find/person/{}'.format(name)
r = self.session.get(
url=endpoint,
headers=self.headers,
timeout=self.timeout
)
if r.status_code == requests.codes.ok:
return r.json()
else:
raise Exception('{}: {}'.format(r.status_code, r.text))
def create_group(self, name):
"""Create a new mcommunity group
Parameters
----------
name : str
The name of the mcommunity group to create
Returns
-------
none
Nothing returned. After creation, group is fetched.
"""
if self._validate_name(name):
endpoint = self.call_url + '/create'
data = {
'name': name
}
self.session.post(
url=endpoint,
data=json.dumps(data),
headers=self.headers,
timeout=self.timeout
)
self.fetch_group(name)
def delete_group(self, name):
"""Delete an mcommunity group
Parameters
----------
name : str
The name of the group to delete
Returns
-------
dict
A dict of response information from the server.
"""
dn = self._create_entity_ldap(name)
encoded_dn = quote(dn)
endpoint = self.call_url + '/delete/{}'.format(encoded_dn)
r = self.session.get(
url=endpoint,
headers=self.headers,
timeout=self.timeout
)
if r.status_code == requests.codes.ok:
return r.json()
else:
raise Exception('{}: {}'.format(r.status_code, r.text))
def renew_group(self, name):
"""Renew an mcommunity group
Parameters
----------
name : str
The name of the group to renew
Returns
-------
dict
A dict of response information from the server.
"""
dn = self._create_entity_ldap(name)
encoded_dn = quote(dn)
endpoint = self.call_url + '/renew/{}'.format(encoded_dn)
r = self.session.get(
url=endpoint,
headers=self.headers,
timeout=self.timeout
)
if r.status_code == requests.codes.ok:
return r.json()
else:
raise Exception('{}: {}'.format(r.status_code, r.text))
# TODO: This endpoint appears to be broken right now.
# We should come back to this, though.
# Endpoint: /reserve/{dn}
def reserve_group(self, name):
pass
def update_group_aliases(self, aliases):
"""Update mcommunity group aliases
Parameters
----------
aliases : str, list
A single alias, or a list thereof.
Returns
-------
dict
A dict of response information from the server.
"""
if not isinstance(aliases, list):
aliases = [aliases]
for alias in aliases:
if self._validate_name(alias):
if self.group_data['aliases']:
self.group_data['aliases'].append(alias)
else:
self.group_data['aliases'] = [alias]
else:
print('{} is an invalid alias.'.format(alias))
return self._apply_update('/update/aliases')
def update_group_description(self, description):
"""Update mcommunity group description
Parameters
----------
description : str
A description to apply to the group.
Returns
-------
dict
A dict of response information from the server.
"""
self.group_data['description'] = str(description)
return self._apply_update('/update/description')
def update_group_notice(self, notice):
"""Update mcommunity group notice
Parameters
----------
notice : str
A notice to apply to the group.
Returns
-------
dict
A dict of response information from the server.
"""
self.group_data['notice'] = str(notice)
return self._apply_update('/update/notice')
def update_group_links(self, links):
"""Update mcommunity group external links
Parameters
----------
links : str, tuple, list
A single HTTP link, a tuple of (name,uri), or a list of either.
Returns
-------
dict
A dict of response information from the server.
"""
if not isinstance(links, list):
links = [links]
if not self.group_data['labeledUri']:
self.group_data['labeledUri'] = []
for link in links:
if isinstance(link, tuple):
self.group_data['labeledUri'].append({
'urlLabel': link[0],
'urlValue': link[1]
})
else:
self.group_data['labeledUri'].append({'urlValue': link})
return self._apply_update('/update/links')
def add_group_members(self, members):
"""Add members to an mcommunity group.
Parameters
----------
members : str, list
A single uniqname, group name, or external member address, or
a list containing any combination thereof.
Returns
-------
dict
A dict of response information from the server.
"""
if not isinstance(members, list):
members = [members]
members = [self._create_entity_ldap(x) for x in members]
for member in members:
if 'uid=' in member:
if self.group_data['memberDn']:
self.group_data['memberDn'].append(member)