Files
rapt-cloud-api/tests/test_hydrometer.py
2025-10-30 10:15:01 +01:00

85 lines
3.5 KiB
Python

import pytest
import json
from aiohttp import web
from aioresponses import aioresponses
from rapt.client import Client
from rapt.hydrometer import Hydrometer
@pytest.fixture
async def client(aiohttp_client):
app = web.Application()
session = await aiohttp_client(app)
yield Client("test", "test", None)
async def test_get_hydrometers(client):
with aioresponses() as responses:
responses.post("https://id.rapt.io/connect/token", payload=json_reader("./tests/json/token_response.json"))
responses.get("https://api.rapt.io/api/hydrometers/gethydrometers", payload=json_reader("./tests/json/get_hydrometers_response.json"))
hydrometer = Hydrometer(client)
hydros = await hydrometer.get_hydrometers()
assert hydros is not None
assert len(hydros) == 1
async def test_get_hydrometers_missing_fields(client):
with aioresponses() as responses:
responses.post("https://id.rapt.io/connect/token", payload=json_reader("./tests/json/token_response.json"))
responses.get("https://api.rapt.io/api/hydrometers/gethydrometers", payload=json_reader("./tests/json/get_hydrometers_response_missing_fields.json"))
hydrometer = Hydrometer(client)
hydros = await hydrometer.get_hydrometers()
assert hydros is not None
assert len(hydros) == 1
async def test_get_hydrometers_500(client):
with aioresponses() as responses:
responses.post("https://id.rapt.io/connect/token", payload=json_reader("./tests/json/token_response.json"))
responses.get("https://api.rapt.io/api/hydrometers/gethydrometers", status=500)
hydrometer = Hydrometer(client)
hydros = await hydrometer.get_hydrometers()
assert hydros is None
async def test_get_hydrometer(client):
with aioresponses() as responses:
responses.post("https://id.rapt.io/connect/token", payload=json_reader("./tests/json/token_response.json"))
responses.get("https://api.rapt.io/api/hydrometers/gethydrometer", payload=json_reader("./tests/json/get_hydrometer_response.json"))
hydrometer = Hydrometer(client)
hydros = await hydrometer.get_hydrometer("")
assert hydros is not None
async def test_get_hydrometer_500(client):
with aioresponses() as responses:
responses.post("https://id.rapt.io/connect/token", payload=json_reader("./tests/json/token_response.json"))
responses.get("https://api.rapt.io/api/hydrometers/gethydrometer", status=500)
hydrometer = Hydrometer(client)
hydros = await hydrometer.get_hydrometer("")
assert hydros is None
async def test_get_telemetry(client):
with aioresponses() as responses:
responses.post("https://id.rapt.io/connect/token", payload=json_reader("./tests/json/token_response.json"))
responses.get("https://api.rapt.io/api/hydrometers/gettelemetry", payload=json_reader("./tests/json/get_hydrometer_telemetry_response.json"))
hydrometer = Hydrometer(client)
telemetry = await hydrometer.get_telemetry("", "", "", "")
assert telemetry is not None
async def test_get_telemetry_500(client):
with aioresponses() as responses:
responses.post("https://id.rapt.io/connect/token", payload=json_reader("./tests/json/token_response.json"))
responses.get("https://api.rapt.io/api/hydrometers/gettelemetry", status=500)
hydrometer = Hydrometer(client)
telemetry = await hydrometer.get_telemetry("", "", "", "")
assert telemetry is None
def json_reader(path):
with open(path) as f:
return json.load(f)