from homeassistant.core import HomeAssistant import aiohttp import asyncio import logging import jwt _LOGGER = logging.getLogger(__name__) TOKEN_ENDPOINT = "/oauth/token" USER_ENDPOINT = "/feapi/v2/user" LOCK_ENDPOINT = "/feapi/v2/lock" LOCK_OPEN_ENDPOINT = "/feapi/v2/lock/open" PROTOCOL = "https" class ReosApi: def __init__(self, host: str, user: str, password: str, client_id: str, client_secret: str, access_token: str, refresh_token: str) -> None: self.host = host self.user = user self._id = user self.password = password self.client_id = client_id self.client_secret = client_secret self._access_token = access_token self._refresh_token = refresh_token self._update_token_callback = None def set_update_token_callback(self, callback) -> None: self._update_token_callback = callback async def __call_update_token_callback(self) -> None: if self._update_token_callback: await self._update_token_callback(self._access_token, self._refresh_token) async def get_access_token(self) -> str: if self._access_token is not None and self.__is_token_valid(self._access_token): return self._access_token elif self._refresh_token is not None: access, refresh = await self.__get_access_and_refresh_token_by_refresh_token() await self.__store_tokens(access, refresh) return self._access_token else: access, refresh = await self.__get_access_and_refresh_token_by_password() await self.__store_tokens(access, refresh) return self._access_token async def __store_tokens(self, access, refresh): self._access_token = access self._refresh_token = refresh await self.__call_update_token_callback() async def __get_access_and_refresh_token_by_password(self) -> tuple[str, str]: async with aiohttp.ClientSession() as session, session.post(f"{PROTOCOL}://{self.host}{TOKEN_ENDPOINT}", json={ "username": self.user, "password": self.password, "grant_type": "password", "client_id": self.client_id, "client_secret": self.client_secret, "scope": "tss" }) as response: _json = await response.json() return _json["access_token"], _json["refresh_token"] async def __get_access_and_refresh_token_by_refresh_token(self) -> tuple[str, str]: async with aiohttp.ClientSession() as session, session.post(f"{PROTOCOL}://{self.host}{TOKEN_ENDPOINT}", json={ "grant_type": "refresh_token", "client_id": self.client_id, "client_secret": self.client_secret, "scope": "tss", "refresh_token": self._refresh_token }) as response: _json = await response.json() return _json["access_token"], _json["refresh_token"] async def open_lock(self, lock_id: int) -> None: async with await self.__get_session() as session: await session.post(f"{PROTOCOL}://{self.host}{LOCK_OPEN_ENDPOINT}", json={"lockId": lock_id}) def __is_token_valid(self, token: str) -> bool: try: jwt.decode(token, options={"verify_signature": False}) return True except jwt.ExpiredSignatureError: return False async def __get_session(self) -> aiohttp.ClientSession: token = await self.get_access_token() return aiohttp.ClientSession(headers={"Authorization": f"Bearer {token}"}) async def test_connection(self) -> bool: async with await self.__get_session() as session, session.get(f"{PROTOCOL}://{self.host}{USER_ENDPOINT}") as response: return response.status == 200 async def get_locks(self) -> list["ReosLockModel"]: async with await self.__get_session() as session, session.get(f"{PROTOCOL}://{self.host}{LOCK_ENDPOINT}") as response: _json = await response.json() _api_locks = _json["data"] locks: list[ReosLockModel] = [] for _api_lock in _api_locks: if str(_api_lock["type"]).lower() == "lock": _attr = _api_lock["attributes"] _lock = ReosLockModel( _attr["id"], _attr["title"], _attr["display_title"], _attr["is_favorite"], self ) locks.append(_lock) return locks class ReosLockModel: def __init__(self, id: int, name: str, display: str, is_favorite: bool, api: ReosApi) -> None: self._id = id self.name = name self.display = display self._api = api self.is_favorite = is_favorite @property def lock_id(self) -> int: return self._id async def open(self) -> None: await self._api.open_lock(self.lock_id)