群组

中亚硅谷IT技术组
分类:管理
楼主

LDAP-PYTHON文档

功能:
   1.用户登录 (LDAP账号密码登录)
   2.用户搜索(查询相关用户信息)
   3.用户信息修改(增加删除修改相关用户信息)
   4.用户密码修改


代码示范:
# coding=utf-8
import ldap


LDAP_CONFIG = {
   'HOST': 'ldap://172.16.0.250',        # ad服务器地址
   'HOST_SSL': 'ldaps://172.16.0.250:636',   # 使用加密协议的ad服务器地址
   'BASE': 'ou=ZY,dc=casvip,dc=com',         #  基准dn,也就是ad的搜索域
   'USER_DN': 'administrator@casvip.com',    #  bind_dn, ad的管理员账号,可以通过此账号对ad字段信息进行查询,修改。
   'PASSWORD': 'Admin123***',                #  管理员密码
   'LOGIN_ATTR': 'userPrincipalName'         #  登录所用的字段 此字段格式为id@casvip.com eg:17081111@casvip.com
}


class MyLdap(object):
   def __init__(self, server_url, user='', passwd=''):
       '''
       :param server_url:
       :param user:
       :param passwd:
       initail
       '''
       self.user = user
       self.passwd = passwd
       self.server_url = server_url
       self.ldap_obj = None


def ldap_connect(self):

"""
用户登录
       connenct ldap-server
       :return: success: true or erro : flase msg
       """

ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
       lp = ldap.initialize(self.server_url)
       lp.set_option(ldap.OPT_REFERRALS, 0)
       lp.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
       lp.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
       lp.set_option(ldap.OPT_X_TLS_DEMAND, True)
       lp.set_option(ldap.OPT_DEBUG_LEVEL, 255)

# try:
       #     conn.start_tls_s()
       # except ldap.LDAPError as exc:
       #     raise APIException(exc.message)
       if self.user and not self.passwd:
           return False, u"请输入LDAP密码"
       try:
           rest = lp.simple_bind_s(self.user, self.passwd)
       except ldap.SERVER_DOWN:
           return False, u"无法连接到LDAP"
       except ldap.INVALID_CREDENTIALS:
           return False, u"LDAP账号错误"
       except Exception, ex:
           # return False, type(ex)
           return False, u"其他错误"
       if rest[0] != 97:  # 97 表示success
           return False, u"帐号或密码错误"
       self.ldap_obj = lp
       return True

def ldap_search(self, base=LDAP_CONFIG['BASE'], keyword=None, rdn=LDAP_CONFIG['LOGIN_ATTR']):
       """
用户搜索
       base: 域 ou=ZY,dc=casvip,dc=com
       keyword: 搜索的用户
       rdn: userPrincipalName
       """
       scope = ldap.SCOPE_SUBTREE
       # filter = "%s=%s" % (rdn, keyword)
       retrieve_attributes = None
       try:
           result_id = self.ldap_obj.search(base, scope, filter, retrieve_attributes)
           result_type, result_data = self.ldap_obj.result(result_id)
           if not result_data:
               return False, []
       except ldap.LDAPError, error_message:
           return False, error_message
       return True, result_data


   def modify_user(self, dn, attr_list):
       """
修改用户信息
       MOD_ADD: 如果属性存在,这个属性可以有多个值,那么新值加进去,旧值保留
       MOD_DELETE :如果属性的值存在,值将被删除
       MOD_REPLACE :这个属性所有的旧值将会被删除,这个值被加进去

dn: cn=test, ou=magicstack,dc=test, dc=com
       attr_list: [( ldap.MOD_REPLACE, 'givenName', 'Francis' ),
                   ( ldap.MOD_ADD, 'cn', 'Frank Bacon' )
                  ]
       """
       try:
           result = self.ldap_obj.modify_s(dn, attr_list)
       except ldap.LDAPError, error_message:
           return False, error_message
       else:
           if result[0] == 103:
               return True, []
           else:
               return False, result[1]



   def get_dn(self, base=LDAP_CONFIG['BASE'], keyword=None, rdn=LDAP_CONFIG['LOGIN_ATTR']):
       '''
获取DN
       :param base:
       :param keyword:
       :param rdn:
       :return: user dn
       '''
       success, result = self.ldap_search(base, keyword, rdn)
       if not success:
           return []
       return result[0][0]


   def ldap_update_pass(self, new_pass, user=''):
       '''
用户密码修改
       password change need host_ssl_url and new_passwd
       :param new_pass:
       :param user:
       :return: True or False,msg
       '''

try:
           dn = self.get_dn(user)
           unicode_pass = unicode("\"" + str(new_pass) + "\"", "iso-8859-1")
           password_value = unicode_pass.encode("utf-16-le")
           add_pass = [(ldap.MOD_REPLACE, 'unicodePwd', [password_value])]
           self.ldap_obj.modify_s(dn, add_pass)
           self.login_out()
       except Exception, e:
           return False, e


   def login_out(self):
       self.ldap_obj.unbind_s()
       del self.ldap_obj
       return True



def main():
   ld = MyLdap(LDAP_CONFIG['HOST'], LDAP_CONFIG['USER_DN'], LDAP_CONFIG['PASSWORD'])
   print ld.ldap_connect() #连接成功 返回True
   search_result = ld.ldap_search(LDAP_CONFIG["BASE"], '123@casvip.com', rdn=LDAP_CONFIG["LOGIN_ATTR"])
   print search_result #返回搜索结果
   attr_list = [(ldap.MOD_REPLACE, 'userpassword', '123456')]
   dn = ld.get_dn(LDAP_CONFIG["BASE"], '123@casvip.com', rdn=LDAP_CONFIG["LOGIN_ATTR"])
   print ld.modify_user(dn, attr_list) #修改userpassword 这一属性


if __name__ == "__main__":
   main()


回复