Page MenuHomePhabricator
Paste P13715

admin_yaml_parse
ActivePublic

Authored by jbond on Jan 11 2021, 2:16 PM.
Tags
None
Referenced Files
F33995791: admin_yaml_parse
Jan 11 2021, 2:20 PM
F33995790: admin_yaml_parse
Jan 11 2021, 2:16 PM
Subscribers
None
#!/usr/bin/env python3
"""example script"""
import logging
from argparse import ArgumentParser
from pathlib import Path
import yaml
from ldap3 import Server, Connection
from ldap3.core.exceptions import LDAPAttributeError
def get_args():
"""Parse arguments
Returns:
`argparse.Namespace`: The parsed argparser Namespace
"""
parser = ArgumentParser(description=__doc__)
parser.add_argument('-a', '--admin-file', type=Path,
default='~/git/puppet/modules/admin/data/data.yaml')
parser.add_argument('-c', '--config-file', type=Path,
default='/etc/ldapvi.conf')
parser.add_argument('-v', '--verbose', action='count')
return parser.parse_args()
def get_log_level(args_level):
"""Convert an integer to a logging log level
Parameters:
args_level (int): The log level as an integer
Returns:
`logging.loglevel`: the logging loglevel
"""
return {
None: logging.ERROR,
1: logging.WARN,
2: logging.INFO,
3: logging.DEBUG}.get(args_level, logging.DEBUG)
def get_config(config_file):
"""Parse the ldapvi.confi file and return an ldap config dict
Arguments:
config_file (`pathlib.Path`): path to the ldapvi.conf file
Returns:
dict: dict representing parse ldap config
"""
config = {}
for line in config_file.read_text().splitlines():
if ':' not in line:
continue
parts = line.split(':')
if parts[0] not in ['host', 'user', 'base', 'password']:
continue
config[parts[0].strip()] = parts[1].strip()
logging.debug('config: %s', config)
return config
def get_ldap_conn(config):
"""Return an ldap connection object
Arguments:
config (dict): a dictionary of ldap config
Returns:
`ldap3.Connection`: An ldap connection obect
"""
server = Server(config['host'], use_ssl=True)
connection = Connection(server, config['user'], config['password'], auto_bind=True)
return connection
def get_ldap_user_groups(ldap_conn, user, base_dn):
"""Return a list of user groups
Arguments:
ldap_conn (`ldap3.Connection`): An ldap connection obect
Returns:
list: a list of groups the user is present in
"""
search = '(uid={})'.format(user)
result = ldap_conn.search(base_dn, search, attributes=['memberOf'])
groups = []
if not result:
logging.warning('%s: not in ldap', user)
return groups
if len(ldap_conn.entries) > 1:
logging.error('%s: more then one user found', user)
logging.debug(ldap_conn.entries)
raise SystemExit(1)
# convert groups to something readable e.g.
# cn=nda,ou=groups,dc=wikimedia,dc=org' -> 'nda'
try:
groups = [group.split(',')[0].split('=')[1] for group in ldap_conn.entries[0].memberOf.values]
except LDAPAttributeError:
logging.debug('%s: has no memberOf attribute', user)
return groups
def get_posix_groups(user, groups):
"""Parse admin data and return a list of groups for a specific user
Arguments:
user (str): the username
groups (dict): the groups object from admin.yaml
Returns:
list: a list of groups the user is in
"""
return [group for group, params in groups.items() if user in params['members'] and group != 'absent']
def main():
"""main entry point
Returns:
int: an int representing the exit code
"""
args = get_args()
privlaged_groups = ['nda', 'wmf', 'wmde']
logging.basicConfig(level=get_log_level(args.verbose))
config = get_config(args.config_file)
admin_file = args.admin_file.expanduser()
ldap_conn = get_ldap_conn(config)
admin = yaml.safe_load(admin_file.read_text())
grouples_shell_users = {}
for user, params in admin['users'].items():
if params['ensure'] != 'present':
continue
groups = get_ldap_user_groups(ldap_conn, user, config['base'])
if any(group in groups for group in privlaged_groups):
logging.debug('%s already in an appropriate group', user)
continue
grouples_shell_users[user] = groups
print('The following users are not in any of the privalaged ldap groups')
for user, groups in grouples_shell_users.items():
print('\t{}: ldap_groups({}) posix groups({})'.format(
user, ','.join(groups), ','.join(get_posix_groups(user, admin['groups']))))
if __name__ == '__main__':
raise SystemExit(main())