Commit 66c00826 authored by Rob Carleski's avatar Rob Carleski 🇮🇸
Browse files

Merge branch 'mcommrbt' into 'master'

Convert MCommunity API module calls. Adjust mocks.

See merge request carleski/collab-admin-kit!1
parents 8735334b cc52cdf9
......@@ -134,7 +134,7 @@ Accounts section of the M+Box support site.
try:
readme = '{}/{}-{}'.format(
self.config['general']['data_dir'],
self.account,
self.username,
'README.txt'
)
with open(readme, 'w+') as stream:
......@@ -167,7 +167,7 @@ http://its.umich.edu/help/'''.format(self.full_name, self.email)
self.config['google']['gam_command'],
'create',
'user',
self.account,
self.username,
'firstname',
self.first_name,
'lastname',
......@@ -199,55 +199,52 @@ https://documentation.its.umich.edu/node/339/
'''
def set_up_mcommunity_group(self):
client = mcommunity.Client(self.config['mcommunity'])
controllerCn = client._create_entity_ldap(
self.config['mcommunity']['api_control_group']
)
ext_addr = self.account.lower() + '@go.itd.umich.edu'
needs_update = False
_control_group = self.config['mcommunity']['api_control_group']
_ext_addr = self.username + '@go.itd.umich.edu'
_needs_update = False
try:
self.logger.info(
'Checking Mcommunity for associated group',
extra={'entity': self.account}
)
client.fetch_group(self.account)
except ValueError as e:
self.logger.info(e, extra={'entity': self.account})
exit(2)
except Exception as e:
client = mcommunity.MCommClient(
self.config['mcommunity']['client_id'],
self.config['mcommunity']['secret']
)
mcomm_group = client.group(self.groupname)
except mcommunity.core.MCommError as e:
self.logger.info(e, extra={'entity': self.account})
if hasattr(client, 'group_data') and client.group_data:
if mcomm_group.exists:
self.logger.info(
'Found associated MCommunity group',
extra={'entity': self.account}
)
for owner in self.owners:
ownerDn = client._create_entity_ldap(owner).lower()
if ownerDn not in client.group_data['ownerDn']:
needs_update = True
break
if set(self.owners) - set(mcomm_group.owners):
_needs_update = True
if self.service in ['google', 'both']:
if type(client.group_data['memberExternalRaw']) is list:
if ext_addr not in client.group_data['memberExternalRaw']:
needs_update = True
else:
needs_update = True
if _ext_addr not in mcomm_group.externalMembers:
_needs_update = True
if needs_update:
if _needs_update:
self.logger.info(
'Mcommunity group needs updating',
extra={'entity': self.account}
)
if controllerCn not in client.group_data['ownerDn']:
if not self.take_group_ownership(self.account):
print(_control_group)
print(mcomm_group.owners)
if _control_group not in mcomm_group.owners:
if not self.take_group_ownership():
self.logger.error(
'Failed to obtain group ownership',
extra={'entity': self.account}
)
exit(2)
client.fetch_group(self.account)
mcomm_group.fetch()
else:
self.logger.info(
'Mcommunity group does not need an update',
......@@ -266,30 +263,20 @@ https://documentation.its.umich.edu/node/339/
'Reserving new Mcommunity group',
extra={'entity': self.account}
)
client.reserve_group(self.account)
self.logger.info(
'Fetching new Mcommunity group',
extra={'entity': self.account}
)
client.fetch_group(self.account)
except Exception as e:
mcomm_group.reserve()
except mcommunity.core.MCommError as e:
self.logger.info(e, extra={'entity': self.account})
exit(2)
if not client.group_data:
self.logger.error(
'Unable to fetch shared account Mcommunity group.',
extra={'entity': self.account}
)
exit(2)
try:
self.logger.info(
'Adding Mcommunity group alias',
extra={'entity': self.account}
)
client.update_group_aliases(self.group_alias)
except Exception as e:
if self.alias not in mcomm_group.aliases:
self.logger.info(
'Adding Mcommunity group alias',
extra={'entity': self.account}
)
mcomm_group.aliases.append(self.alias)
mcomm_group.update_aliases()
except mcommunity.core.MCommError as e:
self.logger.warning(e, extra={'entity': self.account})
try:
......@@ -298,31 +285,30 @@ https://documentation.its.umich.edu/node/339/
extra={'entity': self.account}
)
if self.service in ['google', 'both']:
client.add_group_members(ext_addr)
client.remove_group_members(
self.config['mcommunity']['api_control_group']
)
client.update_group_members()
mcomm_group.externalMembers.append(_ext_addr)
mcomm_group.memberGroups.remove(_control_group)
mcomm_group.update_membership()
except mcommunity.core.MCommError as e:
self.logger.warning(e, extra={'entity': self.account})
try:
self.logger.info(
'Updating Mcommunity group ownership',
extra={'entity': self.account}
)
client.add_group_owners(self.owners)
try:
client.update_group_owners()
client.remove_group_owners(
self.config['mcommunity']['api_control_group']
)
client.update_group_owners()
except Exception:
self.logger.warning(
'Error modifying group ownership. Correct manually.',
extra={'entity': self.account}
)
except Exception as e:
mcomm_group.owners.extend(self.owners)
mcomm_group.owners.remove(
_control_group
)
mcomm_group.update_ownership()
assert len(mcomm_group.owners) > 0
assert _control_group not in mcomm_group.owners
except AssertionError as e:
self.logger.warning(e, extra={'entity': self.account})
except mcommunity.core.MCommError as e:
self.logger.warning(e, extra={'entity': self.account})
def take_group_ownership(self, group):
def take_group_ownership(self):
input('Add api controller as group owner, then press enter.')
return True
......@@ -342,7 +328,7 @@ https://documentation.its.umich.edu/node/339/
)
kadm.ank(
'{}@{}'.format(
self.account,
self.username,
self.config['kerberos']['realm'].upper()
),
self.password
......@@ -353,7 +339,7 @@ https://documentation.its.umich.edu/node/339/
def upload_and_share_password(self):
passFilePath = '{}/{}-passwd.txt'.format(
self.config['general']['data_dir'],
self.account
self.username
)
exp_time = datetime.today() + timedelta(days=10)
with open(passFilePath, 'w') as passwordFile:
......@@ -449,14 +435,11 @@ The password for your requested shared account is:
self.last_name
)
self.account = re.sub(r'[^\w-]', '.', self.account).lower().strip()
self.group_alias = re.sub(
r'[^\w-]',
' ',
self.full_name
).replace('_', ' ').strip()
self.username = re.sub(r'[\_\s]', '.', self.account.strip()).lower()
self.groupname = re.sub(r'[\_\.]', ' ', self.account.strip()).lower()
self.alias = re.sub(r'[\_\.]', ' ', self.full_name).strip().lower()
self.email = '{}@{}'.format(
self.account,
self.username,
self.config['google']['domain']
)
......
......@@ -21,7 +21,6 @@ class SharedAccount():
self.config = config
self.box_exists = False
self.google_exists = False
self.group_data = {}
self.logger = logging.getLogger(__name__)
def check_box(self):
......@@ -59,13 +58,12 @@ class SharedAccount():
def check_mcommunity(self):
try:
group = mcommunity.Client(self.config['mcommunity'])
group.fetch_group(self.account)
self.group_data = group.group_data
except yaml.parser.ParserError as e:
self.logger.error(e, extra={'entity': self.account})
exit(2)
except Exception as e:
client = mcommunity.MCommClient(
self.config['mcommunity']['client_id'],
self.config['mcommunity']['secret']
)
self.mcomm_group = client.group(self.account)
except mcommunity.core.MCommError as e:
self.logger.error(e, extra={'entity': self.account})
exit(2)
......@@ -141,13 +139,7 @@ class SharedAccount():
except CalledProcessError as e:
self.logger.error(e.output, extra={'entity': self.account})
# Gather a list of account owners from MCommunity
owners = []
for _ in self.group_data['ownerDn']:
owner = _.split(',')[0].split('=')[1].strip()
owners.append(owner)
for owner in owners:
for owner in self.mcomm_group.owners:
try:
whatis = cmd([
self.config['google']['gam_command'],
......
......@@ -15,7 +15,7 @@ with open(os.path.join(os.path.dirname(__file__), 'README.md'), 'r') as f:
test_deps = [
'boxsdk[jwt]',
'kadmin',
'mcommunity @ git+https://gitlab.umich.edu/carleski/python-mcommunity.git',
'mcommunity @ git+https://gitlab.umich.edu/carleski/python-mcommunity.git@v2',
'pytest',
'pytest-pep8',
'pyyaml',
......
#!/usr/bin/env python
import json
import os
def get_testdata(base, name):
fname = os.path.join(
os.path.dirname(__file__),
'data/mock_kadmin/{}_{}.json'.format(base, name)
)
if os.path.exists(fname):
with open(fname, 'r') as f:
return f.read()
else:
return '{}'
class init_with_keytab():
def __init__(self, user, keytab):
return None
def ank(self, princ, passwd):
return True
def addprinc(self, princ, passwd):
return True
class getprinc():
def __init__(self, princ):
return None
def change_password(self, passwd):
return True
#!/usr/bin/env python
import json
import os
def get_testdata(base, name):
fname = os.path.join(
os.path.dirname(__file__),
'data/mock_mcomm/{}_{}.json'.format(base, name)
)
if os.path.exists(fname):
with open(fname, 'r') as f:
return f.read()
else:
raise Exception('hurk. blah.')
class Client:
group_reserved = False
def __init__(self):
return None
def _create_entity_ldap(self, entity):
return "cn={},ou=user groups,ou=groups,dc=umich,dc=edu".format(
entity
)
def fetch_group(self, group):
if self.group_reserved:
return True
else:
group_data = get_testdata('profile', group)
self.group_data = json.loads(
group_data
)['group'][0]
def reserve_group(self, group):
self.group_data = json.loads(
get_testdata('reserve_group', group)
)['group']
self.group_reserved = True
def add_group_owners(self, owners):
return True
def add_group_members(self, members):
return True
def remove_group_owners(self, owners):
return True
from . import mock_box
from . import mock_kadmin
from . import mock_mcomm
from collab_admin_kit import create_shared, reset_shared
from collab_admin_kit import create_shared
import json
import os
import pytest
import yaml
from unittest import mock
......@@ -26,10 +24,10 @@ sa = create_shared.SharedAccount(
sa.create_extra_attr()
@mock.patch('collab_admin_kit.reset_shared.boxsdk.JWTAuth.from_settings_file')
@mock.patch('boxsdk.JWTAuth.from_settings_file')
def test_create_box(mock_auth):
mock_auth.return_value(True)
target = 'collab_admin_kit.reset_shared.boxsdk.Client'
target = 'boxsdk.Client'
with mock.patch(target) as MockClient:
_client = mock_box.Client()
MockClient.return_value = _client
......@@ -41,42 +39,43 @@ def test_check_google():
sa.create_google()
def test_set_up_mcommunity_group_new():
@mock.patch('mcommunity.MCommClient')
def test_set_up_mcommunity_group_new(mock_mcomm, capsys):
sa.account = 'testnewsharedaccount'
target = 'collab_admin_kit.reset_shared.mcommunity.Client'
with mock.patch(target) as MockClient:
_client = mock_mcomm.Client()
MockClient.return_value = _client
sa.set_up_mcommunity_group()
def test_set_up_mcommunity_group_existing_owned():
mock_mcomm.return_value.group.return_value.exists = False
mock_mcomm.return_value.group.return_value.fetch_side_effect = [
None,
json.loads(open(
data_dir + '/mock_mcomm/profile_testgroup.json'
).read())
]
sa.set_up_mcommunity_group()
assert mock_mcomm.return_value.group.return_value.reserve.call_count
@mock.patch('mcommunity.MCommClient')
def test_set_up_mcommunity_group_existing_owned(mock_client):
sa.account = 'testsharedaccount'
target = 'collab_admin_kit.reset_shared.mcommunity.Client'
with mock.patch(target) as MockClient:
_client = mock_mcomm.Client()
MockClient.return_value = _client
sa.set_up_mcommunity_group()
mock_client.return_value.group.return_value.owners = ['api-control-group']
mock_client.return_value.group.return_value.fetch.return_value = json.loads(
open(data_dir + '/mock_mcomm/profile_testgroup.json').read()
)
sa.set_up_mcommunity_group()
@mock.patch('collab_admin_kit.create_shared.input')
def test_set_up_mcommunity_group_existing_unowned(mock_input):
@mock.patch('mcommunity.MCommClient')
def test_set_up_mcommunity_group_existing_unowned(mock_client, mock_input):
mock_input.return_value(True)
sa.account = 'unownedsharedaccount'
target = 'collab_admin_kit.reset_shared.mcommunity.Client'
with mock.patch(target) as MockClient:
_client = mock_mcomm.Client()
MockClient.return_value = _client
sa.set_up_mcommunity_group()
mock_client.return_value.group.return_value.fetch.return_value = json.loads(
open(data_dir + '/mock_mcomm/profile_testgroup.json').read()
)
sa.set_up_mcommunity_group()
def test_set_kerberos_password():
sa.account = 'testsharedaccount'
target = 'collab_admin_kit.reset_shared.kadmin.init_with_keytab'
with mock.patch(target) as MockClient:
_client = mock_kadmin.init_with_keytab('user', '/path/to/keytab')
MockClient.return_value = _client
sa.set_kerberos_password()
@mock.patch('kadmin.init_with_keytab')
def test_set_kerberos_password(kadmin):
sa.set_kerberos_password()
def test_upload_and_share_password():
......@@ -106,17 +105,9 @@ def test_valid_account_to_name_translation():
assert sa.first_name == "Test"
assert sa.last_name == "Account"
assert sa.full_name == "Test Account"
def test_invalid_account_to_name_translation(capsys):
with pytest.raises(SystemExit):
create_shared.SharedAccount(
{
'account': 'longaccountname',
'password': 'pass12word',
'owners': ['testuser1', 'testuser2'],
'first_name': None,
'last_name': None
},
yaml.load(open(config), Loader=yaml.BaseLoader)
)
assert ' ' not in sa.email
assert ' ' not in sa.username
assert '.' not in sa.groupname
assert '_' not in sa.groupname
assert '.' not in sa.alias
assert '_' not in sa.alias
from . import mock_box
from . import mock_kadmin
from . import mock_mcomm
from collab_admin_kit import reset_shared
import os
import pytest
import json
import yaml
from unittest import mock
......@@ -20,10 +18,10 @@ sa = reset_shared.SharedAccount(
)
@mock.patch('collab_admin_kit.reset_shared.boxsdk.JWTAuth.from_settings_file')
@mock.patch('boxsdk.JWTAuth.from_settings_file')
def test_check_box(mock_auth):
mock_auth.return_value(True)
with mock.patch('collab_admin_kit.reset_shared.boxsdk.Client') as MockClient:
with mock.patch('boxsdk.Client') as MockClient:
_client = mock_box.Client()
MockClient.return_value = _client
MockClient.return_value.users.return_value = _client.users()
......@@ -34,19 +32,19 @@ def test_check_google():
sa.check_google()
def test_check_mcommunity():
with mock.patch('collab_admin_kit.reset_shared.mcommunity.Client') as MockClient:
_client = mock_mcomm.Client()
MockClient.return_value = _client
sa.check_mcommunity()
@mock.patch('mcommunity.MCommClient')
def test_check_mcommunity(mock_mcomm):
mock_mcomm.return_value.fetch = json.loads(
open(data_dir + '/mock_mcomm/profile_testgroup.json').read()
)
sa.check_mcommunity()
assert sa.mcomm_group.dn
assert sa.mcomm_group.ownerDn
def test_set_kerberos_password():
target = 'collab_admin_kit.reset_shared.kadmin.init_with_keytab'
with mock.patch(target) as MockClient:
_client = mock_kadmin.init_with_keytab('user', '/path/to/keytab')
MockClient.return_value = _client
sa.set_kerberos_password()
@mock.patch('kadmin.init_with_keytab')
def test_set_kerberos_password(kadmin):
sa.set_kerberos_password()
def test_set_google_password():
......@@ -54,11 +52,4 @@ def test_set_google_password():
def test_upload_and_share():
sa.group = {
'ownerDn': [
'uid=testuser,ou=people,dc=ORG,dc=TLD',
'uid=testuser2,ou=people,dc=ORG,dc=TLD',
'cn=testgroup,ou=usergroups,ou=groups,dc=ORG,dc=TLD'
]
}
sa.upload_and_share_password()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment