143 lines
5.8 KiB
Python
143 lines
5.8 KiB
Python
from homeassistant.core import HomeAssistant
|
|
from datetime import datetime
|
|
import dateutil.parser
|
|
import aiohttp
|
|
import asyncio
|
|
import logging
|
|
import jwt
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
TOKEN_ENDPOINT = "/oauth/token"
|
|
USER_ENDPOINT = "/feapi/v2/user"
|
|
LOCK_ENDPOINT = "/feapi/v2/lock"
|
|
LOCK_OPEN_ENDPOINT = "/feapi/v2/lock/open"
|
|
PROTOCOL = "https"
|
|
|
|
class ReosApi:
|
|
|
|
def __init__(self, host: str, user: str, password: str, client_id: str, client_secret: str, access_token: str, refresh_token: str) -> None:
|
|
self.host = host
|
|
self.user = user
|
|
self._id = user
|
|
self.password = password
|
|
self.client_id = client_id
|
|
self.client_secret = client_secret
|
|
self._access_token = access_token
|
|
self._refresh_token = refresh_token
|
|
self._update_token_callback = None
|
|
|
|
def set_update_token_callback(self, callback) -> None:
|
|
self._update_token_callback = callback
|
|
|
|
async def __call_update_token_callback(self) -> None:
|
|
if self._update_token_callback:
|
|
await self._update_token_callback(self._access_token, self._refresh_token)
|
|
|
|
async def get_access_token(self) -> str:
|
|
if self._access_token is not None and self.__is_token_valid(self._access_token):
|
|
return self._access_token
|
|
elif self._refresh_token is not None:
|
|
access, refresh = await self.__get_access_and_refresh_token_by_refresh_token()
|
|
await self.__store_tokens(access, refresh)
|
|
return self._access_token
|
|
else:
|
|
access, refresh = await self.__get_access_and_refresh_token_by_password()
|
|
await self.__store_tokens(access, refresh)
|
|
return self._access_token
|
|
|
|
async def __store_tokens(self, access, refresh):
|
|
self._access_token = access
|
|
self._refresh_token = refresh
|
|
await self.__call_update_token_callback()
|
|
|
|
async def __get_access_and_refresh_token_by_password(self) -> tuple[str, str]:
|
|
async with aiohttp.ClientSession() as session, session.post(f"{PROTOCOL}://{self.host}{TOKEN_ENDPOINT}", json={
|
|
"username": self.user,
|
|
"password": self.password,
|
|
"grant_type": "password",
|
|
"client_id": self.client_id,
|
|
"client_secret": self.client_secret,
|
|
"scope": "tss"
|
|
}) as response:
|
|
_json = await response.json()
|
|
return _json["access_token"], _json["refresh_token"]
|
|
|
|
async def __get_access_and_refresh_token_by_refresh_token(self) -> tuple[str, str]:
|
|
async with aiohttp.ClientSession() as session, session.post(f"{PROTOCOL}://{self.host}{TOKEN_ENDPOINT}", json={
|
|
"grant_type": "refresh_token",
|
|
"client_id": self.client_id,
|
|
"client_secret": self.client_secret,
|
|
"scope": "tss",
|
|
"refresh_token": self._refresh_token
|
|
}) as response:
|
|
_json = await response.json()
|
|
return _json["access_token"], _json["refresh_token"]
|
|
|
|
def __parse_datetime(self, dt: datetime) -> datetime | None:
|
|
if dt is not None:
|
|
return dateutil.parser.parse(str(dt))
|
|
else:
|
|
return None
|
|
|
|
async def open_lock(self, lock_id: int) -> None:
|
|
async with await self.__get_session() as session:
|
|
await session.post(f"{PROTOCOL}://{self.host}{LOCK_OPEN_ENDPOINT}", json={"lockId": lock_id})
|
|
|
|
def __is_token_valid(self, token: str) -> bool:
|
|
try:
|
|
token = jwt.decode(token, options={"verify_signature": False, "verify_exp": True}, verify=True)
|
|
return True
|
|
except jwt.ExpiredSignatureError:
|
|
return False
|
|
|
|
async def __get_session(self) -> aiohttp.ClientSession:
|
|
token = await self.get_access_token()
|
|
return aiohttp.ClientSession(headers={"Authorization": f"Bearer {token}"})
|
|
|
|
async def test_connection(self) -> bool:
|
|
async with await self.__get_session() as session, session.get(f"{PROTOCOL}://{self.host}{USER_ENDPOINT}") as response:
|
|
return response.status == 200
|
|
|
|
async def get_locks(self) -> list["ReosLockModel"]:
|
|
async with await self.__get_session() as session, session.get(f"{PROTOCOL}://{self.host}{LOCK_ENDPOINT}") as response:
|
|
_json = await response.json()
|
|
_api_locks = _json["data"]
|
|
locks: list[ReosLockModel] = []
|
|
for _api_lock in _api_locks:
|
|
if str(_api_lock["type"]).lower() == "lock":
|
|
_attr = _api_lock["attributes"]
|
|
_lock = ReosLockModel(
|
|
_attr["id"],
|
|
_attr["title"],
|
|
_attr["display_title"],
|
|
_attr["is_favorite"],
|
|
self,
|
|
updated_at=self.__parse_datetime(_attr["updated_at"]),
|
|
created_at=self.__parse_datetime(_attr["created_at"]),
|
|
available_from=self.__parse_datetime(_attr["available_from"]),
|
|
available_to=self.__parse_datetime(_attr["available_to"])
|
|
)
|
|
locks.append(_lock)
|
|
return locks
|
|
|
|
|
|
class ReosLockModel:
|
|
def __init__(self, id: int, name: str, display: str, is_favorite: bool, api: ReosApi, available_from: datetime = None, available_to: datetime = None, created_at: datetime = None, updated_at: datetime = None) -> None:
|
|
self._id = id
|
|
self.name = name
|
|
self.display = display
|
|
self._api = api
|
|
self.is_favorite = is_favorite
|
|
self.available_from = available_from
|
|
self.available_to = available_to
|
|
self.created_at = created_at
|
|
self.updated_at = updated_at
|
|
|
|
@property
|
|
def lock_id(self) -> int:
|
|
return self._id
|
|
|
|
async def open(self) -> None:
|
|
await self._api.open_lock(self.lock_id)
|