homeassistant-zoneminder-alarm/custom_components/__init__.py

102 lines
2.8 KiB
Python

"""Support for ZoneMinder."""
import logging
from time import sleep
import voluptuous as vol
from zoneminder.zm import ZoneMinder
from homeassistant.const import (
ATTR_ID,
ATTR_NAME,
CONF_HOST,
CONF_NAME,
CONF_PASSWORD,
CONF_PATH,
CONF_SSL,
CONF_TTL,
CONF_USERNAME,
CONF_VERIFY_SSL,
Platform,
)
from homeassistant.core import HomeAssistant, ServiceCall
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.discovery import async_load_platform
from homeassistant.helpers.typing import ConfigType
from homeassistant.components.zoneminder import DOMAIN as ZONEMINDER_DOMAIN
_LOGGER = logging.getLogger(__name__)
DOMAIN = "zoneminder_alarm"
DELAY = 10
SERVICE_TRIGGER_ALARM_STATE = "trigger_alarm_state"
TRIGGER_ALARM_SCHEMA = vol.Schema(
{vol.Required(ATTR_ID): cv.string, vol.Required(ATTR_NAME): cv.string}
)
ALARM_COMMAND_ON = "on"
ALARM_COMMAND_OFF = "off"
ALARM_COMMAND_STATUS = "status"
ALARM_STATUS_URL = "api/monitors/alarm/id:{monitor_id}/command:{command}.json"
def setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the ZoneMinder component."""
hass.data.setdefault(DOMAIN, {})
for conf in config[DOMAIN]:
zm_id = conf.get(CONF_HOST)
hass.data[DOMAIN].setdefault(zm_id, {})
def trigger_alarm_state(call: ServiceCall) -> None:
zm_id = call.data[ATTR_NAME]
monitor_id = call.data[ATTR_ID]
client = hass.data[ZONEMINDER_DOMAIN][zm_id]
def get_alarm_status():
result = client.get_state(
ALARM_STATUS_URL.format(
monitor_id=monitor_id, command=ALARM_COMMAND_STATUS
)
)
return int(result["status"])
def set_alarm_on():
result = client.get_state(
ALARM_STATUS_URL.format(monitor_id=monitor_id, command=ALARM_COMMAND_ON)
)
return result["status"]
def set_alarm_off():
result = client.get_state(
ALARM_STATUS_URL.format(
monitor_id=monitor_id, command=ALARM_COMMAND_OFF
)
)
return result["status"]
if get_alarm_status() == 0:
set_alarm_on()
hass.data[DOMAIN][zm_id][monitor_id] = DELAY
while hass.data[DOMAIN][zm_id][monitor_id] > 0:
sleep(1)
hass.data[DOMAIN][zm_id][monitor_id] -= 1
set_alarm_off()
else:
hass.data[DOMAIN][zm_id][monitor_id] += DELAY
hass.services.register(
DOMAIN,
SERVICE_TRIGGER_ALARM_STATE,
trigger_alarm_state,
schema=TRIGGER_ALARM_SCHEMA,
)
platforms = (Platform.SWITCH, Platform.SENSOR)
for platform in platforms:
hass.async_create_task(async_load_platform(hass, platform, DOMAIN, {}, config))
return True