Skip to content
Snippets Groups Projects
Select Git revision
  • main default protected
  • threads
2 results

test.py

Blame
  • test.py 5.25 KiB
    from netmiko import ConnectHandler
    from getpass import getpass
    import requests
    import urllib3
    import re 
    import time
    import getpass
    
    
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    ###########################################################################
    print(" 1 = UNL-City \n 2 = UNL-East \n 3 = UNO \n 4 = UNK \n Select Campus:")
    campus = input()
    print(f"Campus Selected: {campus}")
    print("----------------------------------")
    print("----------------------------------")
    print(" 1 = wired_l1_game_dur-3142-2 \n 2 = wired_l2_print_dur-3134-6 \n 3 = Allow-Policy_wired_ap_dur-3095-4 \n 4 = wired_l2_iot_dur-3147-2 \n 5 = wired_l2_shared_dur-3168-2 \n 6 = wired_cctv_dur-3107-8 \n 7 = test \n Select Role:")
    role = input()
    if campus == "1":
        file = [line.strip() for line in open("unlCitySwitches-cx.txt", 'r')]
    if campus == "2":
        file = [line.strip() for line in open("unlEastSwitches-cx.txt", 'r')]
    if campus == "3":
        file = [line.strip() for line in open("unoSwitches-cx.txt", 'r')]
    if campus == "4":
        file = [line.strip() for line in open("unkSwitches-cx.txt", 'r')]
        #print(file[])
        #file.close()
    
    if role == "1":
        policyrole = "wired_l1_game_dur-3142-2"
    if role == "2":
        policyrole = "wired_l2_print_dur-3134-6"
    if role == "3":
        policyrole = "Allow-Policy_wired_ap_dur-3095-4"
    if role == "4":
        policyrole = "wired_l2_iot_dur-3147-2"
    if role == "5":
        policyrole = "wired_l2_shared_dur-3168-2"
    if role == "6":
        policyrole = "wired_cctv_dur-3107-8"
    if role == "7":
        policyrole = "application-failed"
    ##########################################################################
    print("Enter Username:")
    Ausername = input()
    print("Enter Password:")
    Apassword = getpass.getpass()
    creds = {"username": {Ausername}, "password": {Apassword}}
    #policyrole = "wired_l1_game_dur-3142-2"
    
    
    for selectIP in file:
        #print("Enter the Switch IP:")
        ip_add = selectIP
        print(ip_add)
        session = requests.session()
        
    
        try:
            net_connect = ConnectHandler(
                device_type="aruba_procurve",
                host=ip_add,
                username=Ausername,
                password=Apassword,
            )
            print("Sending command " + command + " ...")
    
            output = net_connect.send_command(command)
        except:
            print("Error in connection...skipping")
            continue
    
    
    
    # create sessions object
    
        def get_logs(self):
            try:
                login = session.post(f"https://{self}/rest/v1/login", data=creds, verify=False)
                response.raise_for_status()
                #print(f"Login code from Switch: {login.status_code}")
                # print(f"This is Cookie: {login.cookies}")
    
                get_log = session.get(f"https://{self}/rest/v10.04/system/interfaces/{to2[0]}%2F{to2[1]}%2F{to2[2]}/port_access_clients?attributes=applied_role&depth=2")
                #print(get_log)
                up_down = "down"
                # we use json format get response payload
                if policyrole in f"{get_log.json()}":
                    #print("Found!")
                    headers = {
                        'accept': '*/*',
                        'Content-Type': 'application/json',
                    }
                    data = f'{{"stp_config":{{"admin_edge_port_enable":true,"bpdu_guard_enable":true}},"user_config":{{"admin":"{up_down}"}},"aaa_auth_precedence": {{"1": "mac-auth","2": "dot1x"}},"aaa_auth_priority": {{"1": "dot1x","2": "mac-auth"}},"port_access_clients_limit": 15,"loop_protect_enable":true}}'
                    #data = f'"user_config":{"admin":"{up_down}"}'
                    response = session.put(f'https://{self}/rest/v10.04/system/interfaces/{to2[0]}%2F{to2[1]}%2F{to2[2]}', headers=headers, data=data)
                    up_down = "up"
                    #data = f'{{"stp_config":{{"admin_edge_port_enable":true,"bpdu_guard_enable":true}},"user_config":{{"admin":"{up_down}"}},"loop_protect_enable":true}}'
                    data = f'{{"stp_config":{{"admin_edge_port_enable":true,"bpdu_guard_enable":true}},"user_config":{{"admin":"{up_down}"}},"aaa_auth_precedence": {{"1": "mac-auth","2": "dot1x"}},"aaa_auth_priority": {{"1": "dot1x","2": "mac-auth"}},"port_access_clients_limit": 15,"loop_protect_enable":true}}'
                    time.sleep(1)
                    response = session.put(f'https://{self}/rest/v10.04/system/interfaces/{to2[0]}%2F{to2[1]}%2F{to2[2]}', headers=headers, data=data)
                else:
                    print("Not Found!")
                    headers = {
                        'accept': '*/*',
                        'Content-Type': 'application/json',
                    }
                    print("No port detected with role {policyrole} ")
                
                   
                    
                logout = session.post(f"https://{self}/rest/v1/logout")
                print(f"Logout Code from Switch:{logout.status_code}")
            except requests.exceptions.HTTPError as error:
                print(error)
    
        ###############################################################################
        output = net_connect.send_command(f"show port-access clients role {policyrole}")
        to1 = output.split("\n")
        print(f"Ports that have role: {policyrole}:")
    
        for to in to1 :
            to = to[2:]
            to = to.split(" ")[0]
    
    
            if re.search("[\d]+/[\d]/[\d]+",to):
                print (to)
                to2 = to.split("/")
                get_logs(ip_add)
        net_connect.disconnect()