# -*- coding: utf-8 -*- import logging import json import traceback from ldap3 import ( Server, Connection, ALL ) from functions.constants import ( DISTINGISH_NAME, LDAP_PASSWARD, BASE_NAME, PARAM_PSERVERNAME, ) class ConnectLDAP: """ LDAP接続クラス Attributes ---------- ctrid : str コンテナ名 Notes ----- コンテナ名は事前にバリデーションする """ def __init__(self, ctrid): """ Parameters ---------- ctrid : str コンテナ名 """ self.ctrid = ctrid # self.ctrsinfo = self.road_ctrs() self.connector = self.ldap_connection() def __enter__(self): """ with構文の前処理 """ return self def __exit__(self, exc_type, exc_value, traceback): """ with構文の後処理 """ try: if self.connector: self.connector.unbind() except Exception as e: print(json.dumps({"error": f'{str(e)}'})) # def road_ctrs(self): # """ # ctr.jsonのデータを読み込む関数 # """ # try: # with open(CTR_JSON % self.ctrid, 'r') as opened_file: # ctrsinfo = json.load(opened_file) # return ctrsinfo # except Exception as e: # logging.error(json.dumps({"error": f'{traceback.format_exc()}'})) # return False def ldap_connection(self): """ LDAPコネクションを確立する関数 """ try: server = Server("localhost", get_info=ALL) connector = Connection(server, DISTINGISH_NAME, password=LDAP_PASSWARD, auto_bind=True) return connector except Exception as e: logging.error(json.dumps({"error": f'{traceback.format_exc()}'})) return False def validate_user(self, uid): """ 引数のユーザの存在確認 Parameters ---------- uid : str ユーザ名 returns ------- bool 存在する場合True, しない場合False """ try: ret = self.connector.search( search_base=BASE_NAME, search_filter=f'(uid={uid})' ) return ret except Exception as e: logging.error(json.dumps({"input": { "uid": f'{uid}' }, "error": f'{traceback.format_exc()}'})) return None def check_admin(self, uid): """ 引数のユーザの権限を確認 Parameters ---------- uid : str ユーザ名 returns ------- string サーバ管理者: server_admin, ドメイン管理者: dom_admin, 一般ユーザ: user """ try: self.connector.search( search_base=BASE_NAME, search_filter=f'(uid={uid})', attributes=["uid", "temphostname", "objectClass"] ) if not self.connector.response: return None if ("vserver" not in self.connector.response[0]["attributes"]["objectClass"]): return "user" if not self.connector.response[0]["attributes"]["temphostname"]: return "dom_admin" else: return "server_admin" except Exception as e: logging.error(json.dumps({"input": { "uid": f'{uid}' }, "error": f'{traceback.format_exc()}'})) return None def search(self, search_filter, attributes, search_base=BASE_NAME): """ search_filetrとattributesを指定して単一ユーザに関する情報を取得する Parameters ---------- search_filter : str 検索フィルタルール attributes : list アトリビュートリスト Returns ------- dict 検索結果のアトリビュートに関する辞書 """ try: self.connector.search( search_base=search_base, search_filter=search_filter, attributes=attributes ) if self.connector.response: return self.connector.response else: return None except Exception as e: logging.error(json.dumps({"input": { "search_filter": f'{search_filter}', "attributes": f'{attributes}' }, "error": f'{traceback.format_exc()}'})) return None def custom_search(self, **kwargs): """ 任意のルールでLDAP検索を行う Parameters ---------- **kwargs : dict 検索ルール Returns ------- dict 検索結果のアトリビュートに関する辞書 """ try: ret = self.connector.extend.standard.paged_search( search_base=BASE_NAME, **kwargs ) return self.connector.response except Exception as e: logging.error(json.dumps({ "input": f'{kwargs}', "error": f'{traceback.format_exc()}'})) return None def add(self, dn, entry): """ LDAPにエントリを挿入する Parameters ---------- dn : str Distinguished Name **kwargs : dict 挿入エントリ Returns ------- bool 成功した場合True、失敗した場合False """ try: ret = self.connector.add(dn, attributes=entry) if not ret: logging.error(json.dumps({ "input": {"dn": f'{dn}', "entry": f'{entry}'}, "error": f'{self.connector.result}'})) return ret except Exception as e: logging.error(json.dumps({ "input": {"dn": f'{dn}', "entry": f'{entry}'}, "error": f'{traceback.format_exc()}'})) return False def delete(self, dn): """ LDAPのエントリを削除する Parameters ---------- dn : str Distinguished Name Returns ------- bool 成功した場合True、失敗した場合False """ try: ret = self.connector.delete(dn) if not ret: logging.error(json.dumps({ "input": {"dn": f'{dn}'}, "error": f'{self.connector.result}'})) return ret except Exception as e: logging.error(json.dumps({ "input": {"dn", f'{dn}'}, "error": f'{traceback.format_exc()}'})) return False # def modify(self, dn, mod_list): # """ # LDAPアトリビュートの編集(追加、削除、修正)を行う # Parameters # ---------- # dn : str # Distinguished Name # mod_dict : list # 編集処理を記載したlist, listの中身は以下の様式のtuple # 1->"mod", "add", "del" 2->アトリビュート 3->変更後の値(リスト) # ex) [("mod", "attiribute", ['value'])] # Returns # ------- # bool # 成功した場合True、失敗した場合False # Notes # ----- # addのときすでにattributesに値があるとエラーになる # mod, delのときattributesに値がないとエラーになる # """ # try: # mod_dict = {} # for i in mod_list: # mod_const = "" # if i[0] == "mod": # mod_const = MODIFY_REPLACE # elif i[0] == "add": # mod_const = MODIFY_ADD # elif i[0] == "del": # mod_const = MODIFY_DELETE # else: # logging.error(json.dumps({ # "input": {"dn": f'{dn}', "mod_dict": f'{mod_dict}'}, # "error": f'{E_00_006}'})) # return False # mod_dict.update({i[1]: [(mod_const, i[2])]}) # ret = self.connector.modify(dn, mod_dict) # if not ret: # logging.error(json.dumps({ # "input": {"dn": f'{dn}', "mod_dict": f'{mod_dict}'}, # "error": f'{self.connector.result}'})) # return ret # except Exception as e: # logging.error(json.dumps({ # "input": {"dn": f'{dn}', "mod_dict": f'{mod_dict}'}, # "error": f'{traceback.format_exc()}'})) # return False def modify_dn(self, before_dn, relative_dn, new_superior=None): """ DNの付け替えを行う Parameters ---------- before_dn : str 変更前のDN relative_dn : str 変更後の相対的DN new_superior : str(任意) 変更後のエントリーの親DN Returns ------- bool 成功した場合True、失敗した場合False Notes ----- new_superiorは変更がない場合は指定しないようにする """ try: ret = self.connector.modify_dn( before_dn, relative_dn, new_superior=new_superior ) if not ret: logging.error(json.dumps({ "input": { "before_dn": f'{before_dn}', "relative_dn": f'{relative_dn}', "new_superior": f'{new_superior}' }, "error": f'{self.connector.result}'})) return ret except Exception as e: logging.error(json.dumps({ "input": { "before_dn": f'{before_dn}', "relative_dn": f'{relative_dn}', "new_superior": f'{new_superior}' }, "error": f'{traceback.format_exc()}'})) return False def list_domains(self, uid=None): """ 管理対象もしくはすべてのドメイン一覧を返す Parameters ---------- uid : str ユーザ名(vservername) Returns ------- list 検索されたドメイン一覧 Notes ----- サーバ管理者が管理するドメイン一覧を表示する場合uidにテンポラリドメインを指定する """ domains_list = [] try: if uid: filter_rule = f'(&(objectClass=domainDetail)'\ f'(vservername={uid}))' else: filter_rule = f'(objectClass=domainDetail)' ret = self.connector.search( search_base=BASE_NAME, search_filter=filter_rule, attributes=["domainname"] ) if ret: for i in self.connector.response: domains_list.append(i["attributes"]["domainname"]) return domains_list except Exception as e: logging.error(json.dumps({"input": { "uid": f'{uid}' }, "error": f'{traceback.format_exc()}'})) return [] def list_users(self, uid=None): """ 管理対象もしくはすべてのユーザ一覧を返す Parameters ---------- uid : str ユーザ名(vservername) Returns ------- list 検索されたドメイン一覧 Notes ----- サーバ管理者が管理するドメイン一覧を表示する場合uidにテンポラリドメインを指定する """ users_list = [] try: if uid: search_dn = f"vservername={uid},{PARAM_PSERVERNAME}" else: search_dn = f'{BASE_NAME}' ret = self.connector.search( search_base=search_dn, search_filter=f'(uid=*)', attributes=["uid"] ) if ret: for i in self.connector.response: # oemrootユーザは除外 if i["attributes"]["uid"][0] == "oemroot": continue users_list.append(i["attributes"]["uid"][0]) return users_list except Exception as e: logging.error(json.dumps({"input": { "uid": f'{uid}' }, "error": f'{traceback.format_exc()}'})) return [] def list_domadmins(self): """ すべてのドメイン管理者一覧を返す Returns ------- list 検索されたドメイン管理者一覧 """ domadmin_list = [] try: ret = self.connector.search( search_base=BASE_NAME, search_filter=f'(objectClass=vserver)', attributes=["uid"] ) if ret: for i in self.connector.response: domadmin_list.append(i["attributes"]["uid"][0]) return domadmin_list except Exception as e: logging.error(json.dumps({"error": f'{traceback.format_exc()}'})) return [] def get_max_id_gid(self, increament=True): """ id, gidの最大値を返す Parameters ---------- increment : bool 最大値に+1した数値を返すかどうかのフラグ Returns ------- tuple uid, gid """ uid_list = [] gid_list = [] try: ret = self.connector.search( search_base=BASE_NAME, search_filter=f'(uidNumber=*)', attributes=["uidNumber"] ) if not self.connector.entries: return (None, None) for i in self.connector.response: uid_list.append(int(i["attributes"]["uidNumber"])) ret = self.connector.search( search_base=BASE_NAME, search_filter=f'(gidNumber=*)', attributes=["gidNumber"] ) if not self.connector.entries: return (None, None) for i in self.connector.response: gid_list.append(int(i["attributes"]["gidNumber"])) if increament: return (max(uid_list)+1, max(gid_list)+1) else: return (max(uid_list), max(gid_list)) except Exception as e: logging.error(json.dumps({"input": { "increament": f'{increament}' }, "error": f'{traceback.format_exc()}'})) return (None, None) def get_gid_name(self, uid): """ 指定したuidのグループ名を取得する Parameters ---------- uid : int uid Return ------ str グループ名 Notes ----- MWPの環境でcreate_business_userなどを利用する場合明示的にbusinessをTrueにすることで正しくグループを返すことができる """ try: ret = self.connector.search( search_base=BASE_NAME, search_filter=f'(&(uid={uid})(objectClass=siteUser))', attributes=["gidNumber", "objectClass"] ) if not self.connector.entries: return None if "posixAccount" not in self.connector.response[0]["attributes"]["objectClass"]: return "vuser" gid_num = self.connector.response[0]["attributes"]["gidNumber"] ret = self.connector.search( search_base=BASE_NAME, search_filter=f'(&(gidNumber={gid_num})(objectClass=posixGroup))', attributes=["cn"] ) if not self.connector.entries: return self.ctrsinfo.get("server_admin") else: return self.connector.response[0]["attributes"]["cn"][0] except Exception as e: logging.error(json.dumps({"input": {"uid": f'{uid}'}, "error": f'{traceback.format_exc()}'})) return None