#!/usr/bin/env python3 import argparse import boxsdk import kadmin import logging import logging.handlers import mcommunity import os import re import string import yaml from .core import arg_prompt from secrets import choice from subprocess import check_output as cmd from subprocess import CalledProcessError from subprocess import STDOUT class SharedAccount(): def __init__(self, args, config): self.__dict__.update(args) self.config = config self.box_exists = False self.google_exists = False self.logger = logging.getLogger(__name__) self.groupname = re.sub(r'[\_\.]', ' ', self.account.strip()).lower() self.username = re.sub(r'[\_\s]', '.', self.account.strip()).lower() def check_box(self): try: auth = boxsdk.JWTAuth.from_settings_file( self.config['box']['auth_file'] ) boxClient = boxsdk.Client(auth) boxUsers = boxClient.users(filter_term=self.username) for _ in boxUsers: login = _['login'].split('@')[0] if self.username.lower() == login.lower(): self.box_exists = True break return False except IOError as e: self.logger.error(e, extra={'entity': self.account}) except boxsdk.exception.BoxAPIException as e: self.logger.error(e, extra={'entity': self.account}) def check_google(self): try: output = cmd([ self.config['google']['gam_command'], 'whatis', self.username, 'userview' ], stderr=STDOUT ) if 'is a user' in output.decode('UTF-8'): self.google_exists = True return False except CalledProcessError: return False def check_mcommunity(self): try: client = mcommunity.MCommClient( self.config['mcommunity']['client_id'], self.config['mcommunity']['secret'] ) self.mcomm_group = client.group(self.groupname) assert self.mcomm_group.dn except mcommunity.core.MCommError as e: self.logger.error(e, extra={'entity': self.account}) exit(2) except AssertionError: self.logger.error( 'Unable to find matching group in MCommunity', extra={'entity': self.account} ) exit(2) def set_kerberos_password(self): try: admin = '{}@{}'.format( self.config['kerberos']['admin'], self.config['kerberos']['realm'].upper() ) kadm = kadmin.init_with_keytab( admin, self.config['kerberos']['keytab'] ) principal = '{}@{}'.format( self.username, self.config['kerberos']['realm'].upper() ) princ = kadm.getprinc(principal) if not princ: self.logger.warning( 'Kerberos principal does not exist. Creating.', extra={'entity': self.username} ) kadm.addprinc(principal, self.password) else: princ.change_password(self.password) except IOError as e: self.logger.error(e, extra={'entity': self.account}) exit(2) except kadmin.KAdminError as e: self.logger.error(e, extra={'entity': self.account}) exit(2) def set_google_password(self): try: cmd([ self.config['google']['gam_command'], 'update', 'user', self.username, 'password', self.password ], stderr=STDOUT ) except CalledProcessError as e: self.logger.error(e.output, extra={'entity': self.account}) def upload_and_share_password(self): try: passFilePath = '{}/{}-passwd.txt'.format( self.config['general']['data_dir'], self.account ) with open(passFilePath, 'w') as passwordFile: passwordFile.write(self.password) upload_output = cmd([ self.config['google']['gam_command'], 'user', self.config['google']['admin_account'], 'add', 'drivefile', 'localfile', passFilePath, 'parentname', 'Shared Account Passwords' ], stderr=STDOUT) self.file_id = re.search( r'.*\((.*)\)', upload_output.decode('UTF-8') ).groups()[0] except CalledProcessError as e: self.logger.error(e.output, extra={'entity': self.account}) for owner in self.mcomm_group.owners: try: whatis = cmd([ self.config['google']['gam_command'], 'whatis', owner ], stderr=STDOUT) except CalledProcessError: self.logger.warning( '{} does not seem to exist in Google.'.format(owner), extra={'entity': self.account} ) continue if b'is a user' in whatis: entity_type = 'user' elif b'is a group' in whatis: entity_type = 'group' try: cmd([ self.config['google']['gam_command'], 'user', self.config['google']['admin_account'], 'add', 'drivefileacl', self.file_id, entity_type, owner, 'role', 'reader' ], stderr=STDOUT) except CalledProcessError: self.logger.warning( 'Unable to share password with {}'.format(owner), extra={'entity': self.account} ) continue os.remove(passFilePath) def print_canned_text(self): print(''' Hello, The password for your shared account has been changed. The new password for this account is stored in your Google Drive space, and can be found at the following link: https://docs.google.com/a/{}/file/d/{}. Please note that this file will expire automatically in ten (10) days, and has been shared with any additional owners of the MCommunity group associated with this account. Coordinate with other account owners before making changes to this password. For information on setting this password to a value of your choosing, please see our documentation found at the following link: https://documentation.its.umich.edu/?q=node/339#password If you are still unable to log into your shared account, or if you have any additional questions or concerns, simply reply to this message and your request will be automatically re-opened. '''.format(self.config['google']['domain'], self.file_id)) def main(): helptext = '''example: collab-reset-shared -a umcollab -p pass12word collab-reset-shared --a umcollab --password pass12word ''' parser = argparse.ArgumentParser( description='Resets the password in Google and' 'kerberos for a shared account.', epilog=helptext, formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument( '--account', '-a', help='The target account name.', ) parser.add_argument( '--password', '-p', help='The new password to use.', ) parser.add_argument( '--config', '-c', help='The CAK config to use.', default='/etc/collab-admin-kit.yml' ) args = parser.parse_args() # Argument prompt fallback if not args.account: args.account = arg_prompt( 'Target shared account to reset password for' ) args.password = arg_prompt( '(optional) Password to set', default='' ) # Argument sanity check _required = ['account'] for r in _required: try: if isinstance(r, str): assert getattr(args, r) if isinstance(r, tuple): assert getattr(args, r[0]) or getattr(args, r[1]) except AssertionError: print('Missing required argument {}'.format(r)) # Open the CAK Config with open(args.config) as stream: config = yaml.load(stream, Loader=yaml.BaseLoader) # Get the root logger and set the debug level logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # Create a syslog handler, set format, and associate. sh = logging.handlers.SysLogHandler( address='/dev/log', facility=config['general']['log_facility'] ) formatter = logging.Formatter(config['general']['log_format']) sh.setFormatter(formatter) logger.addHandler(sh) # Create a console handler, set format, and associate. ch = logging.StreamHandler() formatter = logging.Formatter( config['general']['console_format'], config['general']['date_format'] ) ch.setFormatter(formatter) logger.addHandler(ch) if not args.password: pool = string.ascii_letters + string.digits + '!@#$%&*' args.password = ''.join(choice(pool) for i in range(12)) sa = SharedAccount(vars(args), config) sa.check_mcommunity() sa.check_box() sa.check_google() # Exit out if a shared account doesn't exist. if not sa.google_exists and not sa.box_exists: logger.error( 'Entity does not exist as a shared account', extra={'entity': args.account} ) exit(2) # Set the password in kerberos sa.set_kerberos_password() # Set in Google if available: if sa.google_exists: sa.set_google_password() # Upload password doc and share to group owners sa.upload_and_share_password() # Print canned text to send to the user. sa.print_canned_text() if __name__ == '__main__': main()