hass-reos-integration/custom_components/reos_integration/reos_api.py

143 lines
5.8 KiB
Python

from homeassistant.core import HomeAssistant
from datetime import datetime
import dateutil.parser
import aiohttp
import asyncio
import logging
import jwt
_LOGGER = logging.getLogger(__name__)
TOKEN_ENDPOINT = "/oauth/token"
USER_ENDPOINT = "/feapi/v2/user"
LOCK_ENDPOINT = "/feapi/v2/lock"
LOCK_OPEN_ENDPOINT = "/feapi/v2/lock/open"
PROTOCOL = "https"
class ReosApi:
def __init__(self, host: str, user: str, password: str, client_id: str, client_secret: str, access_token: str, refresh_token: str) -> None:
self.host = host
self.user = user
self._id = user
self.password = password
self.client_id = client_id
self.client_secret = client_secret
self._access_token = access_token
self._refresh_token = refresh_token
self._update_token_callback = None
def set_update_token_callback(self, callback) -> None:
self._update_token_callback = callback
async def __call_update_token_callback(self) -> None:
if self._update_token_callback:
await self._update_token_callback(self._access_token, self._refresh_token)
async def get_access_token(self) -> str:
if self._access_token is not None and self.__is_token_valid(self._access_token):
return self._access_token
elif self._refresh_token is not None:
access, refresh = await self.__get_access_and_refresh_token_by_refresh_token()
await self.__store_tokens(access, refresh)
return self._access_token
else:
access, refresh = await self.__get_access_and_refresh_token_by_password()
await self.__store_tokens(access, refresh)
return self._access_token
async def __store_tokens(self, access, refresh):
self._access_token = access
self._refresh_token = refresh
await self.__call_update_token_callback()
async def __get_access_and_refresh_token_by_password(self) -> tuple[str, str]:
async with aiohttp.ClientSession() as session, session.post(f"{PROTOCOL}://{self.host}{TOKEN_ENDPOINT}", json={
"username": self.user,
"password": self.password,
"grant_type": "password",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "tss"
}) as response:
_json = await response.json()
return _json["access_token"], _json["refresh_token"]
async def __get_access_and_refresh_token_by_refresh_token(self) -> tuple[str, str]:
async with aiohttp.ClientSession() as session, session.post(f"{PROTOCOL}://{self.host}{TOKEN_ENDPOINT}", json={
"grant_type": "refresh_token",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "tss",
"refresh_token": self._refresh_token
}) as response:
_json = await response.json()
return _json["access_token"], _json["refresh_token"]
def __parse_datetime(self, dt: datetime) -> datetime | None:
if dt is not None:
return dateutil.parser.parse(str(dt))
else:
return None
async def open_lock(self, lock_id: int) -> None:
async with await self.__get_session() as session:
await session.post(f"{PROTOCOL}://{self.host}{LOCK_OPEN_ENDPOINT}", json={"lockId": lock_id})
def __is_token_valid(self, token: str) -> bool:
try:
token = jwt.decode(token, options={"verify_signature": False, "verify_exp": True}, verify=True)
return True
except jwt.ExpiredSignatureError:
return False
async def __get_session(self) -> aiohttp.ClientSession:
token = await self.get_access_token()
return aiohttp.ClientSession(headers={"Authorization": f"Bearer {token}"})
async def test_connection(self) -> bool:
async with await self.__get_session() as session, session.get(f"{PROTOCOL}://{self.host}{USER_ENDPOINT}") as response:
return response.status == 200
async def get_locks(self) -> list["ReosLockModel"]:
async with await self.__get_session() as session, session.get(f"{PROTOCOL}://{self.host}{LOCK_ENDPOINT}") as response:
_json = await response.json()
_api_locks = _json["data"]
locks: list[ReosLockModel] = []
for _api_lock in _api_locks:
if str(_api_lock["type"]).lower() == "lock":
_attr = _api_lock["attributes"]
_lock = ReosLockModel(
_attr["id"],
_attr["title"],
_attr["display_title"],
_attr["is_favorite"],
self,
updated_at=self.__parse_datetime(_attr["updated_at"]),
created_at=self.__parse_datetime(_attr["created_at"]),
available_from=self.__parse_datetime(_attr["available_from"]),
available_to=self.__parse_datetime(_attr["available_to"])
)
locks.append(_lock)
return locks
class ReosLockModel:
def __init__(self, id: int, name: str, display: str, is_favorite: bool, api: ReosApi, available_from: datetime = None, available_to: datetime = None, created_at: datetime = None, updated_at: datetime = None) -> None:
self._id = id
self.name = name
self.display = display
self._api = api
self.is_favorite = is_favorite
self.available_from = available_from
self.available_to = available_to
self.created_at = created_at
self.updated_at = updated_at
@property
def lock_id(self) -> int:
return self._id
async def open(self) -> None:
await self._api.open_lock(self.lock_id)