attempt new login method
This commit is contained in:
37
app.py
37
app.py
@ -47,14 +47,26 @@ def send_webhook(device, status, diagnostics):
|
||||
class UnifiAPI:
|
||||
def __init__(self, config):
|
||||
self.base_url = config['unifi']['controller']
|
||||
self.headers = {
|
||||
'X-API-KEY': config['unifi']['api_key'],
|
||||
'Accept': 'application/json',
|
||||
'Authorization': f"Bearer {config['unifi']['api_key']}"
|
||||
}
|
||||
self.api_key = config['unifi']['api_key']
|
||||
self.site_id = config['unifi']['site_id']
|
||||
self.session = requests.Session()
|
||||
self.session.verify = False
|
||||
self.login()
|
||||
|
||||
def login(self):
|
||||
"""Login to UDM Pro using the API key"""
|
||||
login_url = f"{self.base_url}/api/auth/login"
|
||||
response = self.session.post(login_url, json={
|
||||
"username": "admin",
|
||||
"apiKey": self.api_key
|
||||
})
|
||||
response.raise_for_status()
|
||||
# Store CSRF token for subsequent requests
|
||||
self.csrf_token = response.headers.get('X-CSRF-Token')
|
||||
self.headers = {
|
||||
'X-CSRF-Token': self.csrf_token,
|
||||
'Accept': 'application/json'
|
||||
}
|
||||
|
||||
def get_all_devices(self):
|
||||
logger.info(f"Fetching devices from UniFi controller at {self.base_url}")
|
||||
@ -70,15 +82,12 @@ class UnifiAPI:
|
||||
logger.error(f"Failed to fetch devices: {str(e)}")
|
||||
return []
|
||||
|
||||
def get_device_details(self, ip_address):
|
||||
logger.debug(f"Looking for device with IP: {ip_address}")
|
||||
devices = self.get_all_devices()
|
||||
for device in devices:
|
||||
if device.get('ipAddress') == ip_address:
|
||||
logger.debug(f"Found device details: {device}")
|
||||
return device
|
||||
logger.debug(f"No device found with IP: {ip_address}")
|
||||
return None
|
||||
def get_device_details(self, device_id):
|
||||
"""Get device details using the proxy/network prefix for UDM Pro"""
|
||||
url = f"{self.base_url}/proxy/network/api/s/{self.site_id}/stat/device/{device_id}"
|
||||
response = self.session.get(url, headers=self.headers)
|
||||
response.raise_for_status()
|
||||
return response.json()['data'][0]
|
||||
|
||||
def get_device_diagnostics(self, device):
|
||||
details = self.get_device_details(device['ip'])
|
||||
|
||||
Reference in New Issue
Block a user