74 lines
2.9 KiB
Python
74 lines
2.9 KiB
Python
import pytest
|
|
import json
|
|
from aiohttp import web
|
|
from aioresponses import aioresponses
|
|
from rapt.client import Client
|
|
from rapt.hydrometer import Hydrometer
|
|
|
|
|
|
@pytest.fixture
|
|
async def client(aiohttp_client):
|
|
app = web.Application()
|
|
session = await aiohttp_client(app)
|
|
yield Client("test", "test", None)
|
|
|
|
async def test_get_hydrometers(client):
|
|
with aioresponses() as responses:
|
|
responses.post("https://id.rapt.io/connect/token", payload=json_reader("./tests/json/token_response.json"))
|
|
responses.get("/api/hydrometers/gethydrometers", payload=json_reader("./tests/json/get_hydrometers_response.json"))
|
|
hydrometer = Hydrometer(client)
|
|
hydros = await hydrometer.get_hydrometers()
|
|
|
|
assert hydros is not None
|
|
assert len(hydros) == 1
|
|
|
|
async def test_get_hydrometers_500(client):
|
|
with aioresponses() as responses:
|
|
responses.post("https://id.rapt.io/connect/token", payload=json_reader("./tests/json/token_response.json"))
|
|
responses.get("/api/hydrometers/gethydrometers", status=500)
|
|
hydrometer = Hydrometer(client)
|
|
hydros = await hydrometer.get_hydrometers()
|
|
|
|
assert hydros is None
|
|
|
|
async def test_get_hydrometer(client):
|
|
with aioresponses() as responses:
|
|
responses.post("https://id.rapt.io/connect/token", payload=json_reader("./tests/json/token_response.json"))
|
|
responses.get("/api/hydrometers/gethydrometer", payload=json_reader("./tests/json/get_hydrometer_response.json"))
|
|
hydrometer = Hydrometer(client)
|
|
hydros = await hydrometer.get_hydrometer("")
|
|
|
|
assert hydros is not None
|
|
|
|
async def test_get_hydrometer_500(client):
|
|
with aioresponses() as responses:
|
|
responses.post("https://id.rapt.io/connect/token", payload=json_reader("./tests/json/token_response.json"))
|
|
responses.get("/api/hydrometers/gethydrometer", status=500)
|
|
hydrometer = Hydrometer(client)
|
|
hydros = await hydrometer.get_hydrometer("")
|
|
|
|
assert hydros is None
|
|
|
|
async def test_get_telemetry(client):
|
|
with aioresponses() as responses:
|
|
responses.post("https://id.rapt.io/connect/token", payload=json_reader("./tests/json/token_response.json"))
|
|
responses.get("/api/hydrometers/gettelemetry", payload=json_reader("./tests/json/get_hydrometer_telemetry_response.json"))
|
|
hydrometer = Hydrometer(client)
|
|
telemetry = await hydrometer.get_telemetry("", "", "", "")
|
|
|
|
assert telemetry is not None
|
|
|
|
async def test_get_telemetry_500(client):
|
|
with aioresponses() as responses:
|
|
responses.post("https://id.rapt.io/connect/token", payload=json_reader("./tests/json/token_response.json"))
|
|
responses.get("/api/hydrometers/gettelemetry", status=500)
|
|
hydrometer = Hydrometer(client)
|
|
telemetry = await hydrometer.get_telemetry("", "", "", "")
|
|
|
|
assert telemetry is None
|
|
|
|
|
|
def json_reader(path):
|
|
with open(path) as f:
|
|
return json.load(f)
|