diff --git a/pyhon/connection/api.py b/pyhon/connection/api.py index a9fab02..3af519a 100644 --- a/pyhon/connection/api.py +++ b/pyhon/connection/api.py @@ -9,7 +9,8 @@ from aiohttp import ClientSession from pyhon import const, exceptions from pyhon.appliance import HonAppliance from pyhon.connection.auth import HonAuth -from pyhon.connection.handler import HonConnectionHandler, HonAnonymousConnectionHandler +from pyhon.connection.handler.hon import HonConnectionHandler +from pyhon.connection.handler.anonym import HonAnonymousConnectionHandler _LOGGER = logging.getLogger() diff --git a/pyhon/connection/auth.py b/pyhon/connection/auth.py index 96bfd83..715d7ef 100644 --- a/pyhon/connection/auth.py +++ b/pyhon/connection/auth.py @@ -5,13 +5,14 @@ import secrets import urllib from datetime import datetime, timedelta from pprint import pformat -from typing import List, Tuple from urllib import parse from urllib.parse import quote +from aiohttp import ClientResponse from yarl import URL from pyhon import const, exceptions +from pyhon.connection.handler.auth import HonAuthConnectionHandler _LOGGER = logging.getLogger(__name__) @@ -22,6 +23,7 @@ class HonAuth: def __init__(self, session, email, password, device) -> None: self._session = session + self._request = HonAuthConnectionHandler(session) self._email = email self._password = password self._access_token = "" @@ -29,26 +31,25 @@ class HonAuth: self._cognito_token = "" self._id_token = "" self._device = device - self._called_urls: List[Tuple[int, str]] = [] self._expires: datetime = datetime.utcnow() @property - def cognito_token(self): + def cognito_token(self) -> str: return self._cognito_token @property - def id_token(self): + def id_token(self) -> str: return self._id_token @property - def access_token(self): + def access_token(self) -> str: return self._access_token @property - def refresh_token(self): + def refresh_token(self) -> str: return self._refresh_token - def _check_token_expiration(self, hours): + def _check_token_expiration(self, hours: int) -> bool: return datetime.utcnow() >= self._expires + timedelta(hours=hours) @property @@ -59,34 +60,38 @@ class HonAuth: def token_expires_soon(self) -> bool: return self._check_token_expiration(self._TOKEN_EXPIRE_WARNING_HOURS) - async def _error_logger(self, response, fail=True): - result = "hOn Authentication Error\n" - for i, (status, url) in enumerate(self._called_urls): - result += f" {i + 1: 2d} {status} - {url}\n" - result += f"ERROR - {response.status} - {response.request_info.url}\n" - result += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}" - _LOGGER.error(result) + async def _error_logger(self, response: ClientResponse, fail: bool = True) -> None: + output = "hOn Authentication Error\n" + for i, (status, url) in enumerate(self._request.called_urls): + output += f" {i + 1: 2d} {status} - {url}\n" + output += f"ERROR - {response.status} - {response.request_info.url}\n" + output += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}" + _LOGGER.error(output) if fail: raise exceptions.HonAuthenticationError("Can't login") - async def _load_login(self): + def _generate_nonce(self) -> str: nonce = secrets.token_hex(16) - nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" + return f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" + + async def _load_login(self): + login_url = await self._introduce() + login_url = await self._handle_redirects(login_url) + return await self._login_url(login_url) + + async def _introduce(self) -> str: + redirect_uri = urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done") params = { "response_type": "token+id_token", "client_id": const.CLIENT_ID, - "redirect_uri": urllib.parse.quote( - f"{const.APP}://mobilesdk/detect/oauth/done" - ), + "redirect_uri": redirect_uri, "display": "touch", "scope": "api openid refresh_token web", - "nonce": nonce, + "nonce": self._generate_nonce(), } params = "&".join([f"{k}={v}" for k, v in params.items()]) - async with self._session.get( - f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}" - ) as response: - self._called_urls.append((response.status, response.request_info.url)) + url = f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}" + async with self._request.get(url) as response: text = await response.text() self._expires = datetime.utcnow() if not (login_url := re.findall("url = '(.+?)'", text)): @@ -94,37 +99,30 @@ class HonAuth: self._parse_token_data(text) raise exceptions.HonNoAuthenticationNeeded() await self._error_logger(response) - return False - async with self._session.get(login_url[0], allow_redirects=False) as redirect1: - self._called_urls.append((redirect1.status, redirect1.request_info.url)) - if not (url := redirect1.headers.get("Location")): - await self._error_logger(redirect1) - return False - async with self._session.get(url, allow_redirects=False) as redirect2: - self._called_urls.append((redirect2.status, redirect2.request_info.url)) - if not ( - url := redirect2.headers.get("Location") - + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn" - ): - await self._error_logger(redirect2) - return False - async with self._session.get( - URL(url, encoded=True), headers={"user-agent": const.USER_AGENT} - ) as login_screen: - self._called_urls.append( - (login_screen.status, login_screen.request_info.url) - ) - if context := re.findall( - '"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text() - ): + return login_url[0] + + async def _manual_redirect(self, url: str) -> str: + async with self._request.get(url, allow_redirects=False) as response: + if not (new_location := response.headers.get("Location")): + await self._error_logger(response) + return new_location + + async def _handle_redirects(self, login_url) -> str: + redirect1 = await self._manual_redirect(login_url) + redirect2 = await self._manual_redirect(redirect1) + return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn" + + async def _login_url(self, login_url: str) -> str: + headers = {"user-agent": const.USER_AGENT} + url = URL(login_url, encoded=True) + async with self._request.get(url, headers=headers) as response: + text = await response.text() + if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', text): fw_uid, loaded_str = context[0] loaded = json.loads(loaded_str) - login_url = login_url[0].replace( - "/".join(const.AUTH_API.split("/")[:-1]), "" - ) - return fw_uid, loaded, login_url - await self._error_logger(login_screen) - return False + result = login_url.replace("/".join(const.AUTH_API.split("/")[:-1]), "") + return fw_uid, loaded, result + await self._error_logger(response) async def _login(self, fw_uid, loaded, login_url): data = { @@ -157,13 +155,12 @@ class HonAuth: "aura.token": None, } params = {"r": 3, "other.LightningLoginCustom.login": 1} - async with self._session.post( + async with self._request.post( const.AUTH_API + "/s/sfsites/aura", headers={"Content-Type": "application/x-www-form-urlencoded"}, data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), params=params, ) as response: - self._called_urls.append((response.status, response.request_info.url)) if response.status == 200: try: data = await response.json() @@ -186,8 +183,7 @@ class HonAuth: self._id_token = id_token[0] async def _get_token(self, url): - async with self._session.get(url) as response: - self._called_urls.append((response.status, response.request_info.url)) + async with self._request.get(url) as response: if response.status != 200: await self._error_logger(response) return False @@ -196,15 +192,13 @@ class HonAuth: await self._error_logger(response) return False if "ProgressiveLogin" in url[0]: - async with self._session.get(url[0]) as response: - self._called_urls.append((response.status, response.request_info.url)) + async with self._request.get(url[0]) as response: if response.status != 200: await self._error_logger(response) return False url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text()) url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0] - async with self._session.get(url) as response: - self._called_urls.append((response.status, response.request_info.url)) + async with self._request.get(url) as response: if response.status != 200: await self._error_logger(response) return False @@ -214,10 +208,9 @@ class HonAuth: async def _api_auth(self): post_headers = {"id-token": self._id_token} data = self._device.get() - async with self._session.post( + async with self._request.post( f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data ) as response: - self._called_urls.append((response.status, response.request_info.url)) try: json_data = await response.json() except json.JSONDecodeError: @@ -246,10 +239,9 @@ class HonAuth: "refresh_token": self._refresh_token, "grant_type": "refresh_token", } - async with self._session.post( + async with self._request.post( f"{const.AUTH_API}/services/oauth2/token", params=params ) as response: - self._called_urls.append((response.status, response.request_info.url)) if response.status >= 400: await self._error_logger(response, fail=False) return False @@ -261,7 +253,7 @@ class HonAuth: def clear(self): self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2]) - self._called_urls = [] + self._request.called_urls = [] self._cognito_token = "" self._id_token = "" self._access_token = "" diff --git a/pyhon/connection/handler/__init__.py b/pyhon/connection/handler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyhon/connection/handler/anonym.py b/pyhon/connection/handler/anonym.py new file mode 100644 index 0000000..1ed0410 --- /dev/null +++ b/pyhon/connection/handler/anonym.py @@ -0,0 +1,21 @@ +import logging +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from typing import Callable, Dict + +from pyhon import const +from pyhon.connection.handler.base import ConnectionHandler + +_LOGGER = logging.getLogger(__name__) + + +class HonAnonymousConnectionHandler(ConnectionHandler): + _HEADERS: Dict = ConnectionHandler._HEADERS | {"x-api-key": const.API_KEY} + + @asynccontextmanager + async def _intercept(self, method: Callable, *args, **kwargs) -> AsyncIterator: + kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS + async with method(*args, **kwargs) as response: + if response.status == 403: + _LOGGER.error("Can't authenticate anymore") + yield response diff --git a/pyhon/connection/handler/auth.py b/pyhon/connection/handler/auth.py new file mode 100644 index 0000000..ecba4cb --- /dev/null +++ b/pyhon/connection/handler/auth.py @@ -0,0 +1,36 @@ +import logging +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from typing import Optional, Callable, List, Tuple + +import aiohttp + +from pyhon import const +from pyhon.connection.handler.base import ConnectionHandler + +_LOGGER = logging.getLogger(__name__) + + +class HonAuthConnectionHandler(ConnectionHandler): + _HEADERS = {"user-agent": const.USER_AGENT} + + def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None: + super().__init__(session) + self._called_urls: List[Tuple[int, str]] = [] + + @property + def called_urls(self) -> List[Tuple[int, str]]: + return self._called_urls + + @called_urls.setter + def called_urls(self, called_urls: List[Tuple[int, str]]) -> None: + self._called_urls = called_urls + + @asynccontextmanager + async def _intercept( + self, method: Callable, *args, loop: int = 0, **kwargs + ) -> AsyncIterator: + kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS + async with method(*args, **kwargs) as response: + self._called_urls.append((response.status, response.request_info.url)) + yield response diff --git a/pyhon/connection/handler/base.py b/pyhon/connection/handler/base.py new file mode 100644 index 0000000..7542df6 --- /dev/null +++ b/pyhon/connection/handler/base.py @@ -0,0 +1,57 @@ +import logging +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from typing import Optional, Callable, Dict + +import aiohttp +from typing_extensions import Self + +from pyhon import const, exceptions + +_LOGGER = logging.getLogger(__name__) + + +class ConnectionHandler: + _HEADERS: Dict = { + "user-agent": const.USER_AGENT, + "Content-Type": "application/json", + } + + def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None: + self._create_session: bool = session is None + self._session: Optional[aiohttp.ClientSession] = session + + async def __aenter__(self) -> Self: + return await self.create() + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + await self.close() + + async def create(self) -> Self: + if self._create_session: + self._session = aiohttp.ClientSession() + return self + + @asynccontextmanager + def _intercept(self, method: Callable, *args, loop: int = 0, **kwargs): + raise NotImplementedError + + @asynccontextmanager + async def get(self, *args, **kwargs) -> AsyncIterator[Callable]: + if self._session is None: + raise exceptions.NoSessionException() + response: Callable + async with self._intercept(self._session.get, *args, **kwargs) as response: + yield response + + @asynccontextmanager + async def post(self, *args, **kwargs) -> AsyncIterator[Callable]: + if self._session is None: + raise exceptions.NoSessionException() + response: Callable + async with self._intercept(self._session.post, *args, **kwargs) as response: + yield response + + async def close(self) -> None: + if self._create_session and self._session is not None: + await self._session.close() diff --git a/pyhon/connection/handler.py b/pyhon/connection/handler/hon.py similarity index 60% rename from pyhon/connection/handler.py rename to pyhon/connection/handler/hon.py index d12b559..7da82a8 100644 --- a/pyhon/connection/handler.py +++ b/pyhon/connection/handler/hon.py @@ -1,69 +1,21 @@ import json -from collections.abc import Generator, AsyncIterator, Coroutine +import logging +from collections.abc import AsyncIterator from contextlib import asynccontextmanager from typing import Optional, Callable, Dict -from typing_extensions import Self import aiohttp +from typing_extensions import Self -from pyhon import const, exceptions -from pyhon.connection.auth import HonAuth, _LOGGER +from pyhon.connection.auth import HonAuth from pyhon.connection.device import HonDevice +from pyhon.connection.handler.base import ConnectionHandler from pyhon.exceptions import HonAuthenticationError - -class HonBaseConnectionHandler: - _HEADERS: Dict = { - "user-agent": const.USER_AGENT, - "Content-Type": "application/json", - } - - def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None: - self._create_session: bool = session is None - self._session: Optional[aiohttp.ClientSession] = session - self._auth: Optional[HonAuth] = None - - async def __aenter__(self) -> Self: - return await self.create() - - async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: - await self.close() - - @property - def auth(self) -> Optional[HonAuth]: - return self._auth - - async def create(self) -> Self: - if self._create_session: - self._session = aiohttp.ClientSession() - return self - - @asynccontextmanager - def _intercept(self, method: Callable, *args, loop: int = 0, **kwargs): - raise NotImplementedError - - @asynccontextmanager - async def get(self, *args, **kwargs) -> AsyncIterator[Callable]: - if self._session is None: - raise exceptions.NoSessionException() - response: Callable - async with self._intercept(self._session.get, *args, **kwargs) as response: - yield response - - @asynccontextmanager - async def post(self, *args, **kwargs) -> AsyncIterator[Callable]: - if self._session is None: - raise exceptions.NoSessionException() - response: Callable - async with self._intercept(self._session.post, *args, **kwargs) as response: - yield response - - async def close(self) -> None: - if self._create_session and self._session is not None: - await self._session.close() +_LOGGER = logging.getLogger(__name__) -class HonConnectionHandler(HonBaseConnectionHandler): +class HonConnectionHandler(ConnectionHandler): def __init__( self, email: str, password: str, session: Optional[aiohttp.ClientSession] = None ) -> None: @@ -75,6 +27,11 @@ class HonConnectionHandler(HonBaseConnectionHandler): raise HonAuthenticationError("An email address must be specified") if not self._password: raise HonAuthenticationError("A password address must be specified") + self._auth: Optional[HonAuth] = None + + @property + def auth(self) -> Optional[HonAuth]: + return self._auth @property def device(self) -> HonDevice: @@ -143,17 +100,3 @@ class HonConnectionHandler(HonBaseConnectionHandler): await response.text(), ) raise HonAuthenticationError("Decode Error") - - -class HonAnonymousConnectionHandler(HonBaseConnectionHandler): - _HEADERS: Dict = HonBaseConnectionHandler._HEADERS | {"x-api-key": const.API_KEY} - - @asynccontextmanager - async def _intercept( - self, method: Callable, *args, loop: int = 0, **kwargs - ) -> AsyncIterator: - kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS - async with method(*args, **kwargs) as response: - if response.status == 403: - _LOGGER.error("Can't authenticate anymore") - yield response