"""Helper library to create voice apps for Rhasspy using the Hermes protocol."""
import argparse
import asyncio
import logging
import re
from copy import deepcopy
from dataclasses import dataclass
from typing import Awaitable, Callable, Dict, List, Optional, Union
import paho.mqtt.client as mqtt
import rhasspyhermes.cli as hermes_cli
from rhasspyhermes.client import HermesClient
from rhasspyhermes.dialogue import (
DialogueContinueSession,
DialogueEndSession,
DialogueIntentNotRecognized,
DialogueNotification,
DialogueStartSession,
)
from rhasspyhermes.nlu import NluIntent, NluIntentNotRecognized
from rhasspyhermes.wake import HotwordDetected
_LOGGER = logging.getLogger("HermesApp")
[docs]@dataclass
class ContinueSession:
"""Helper class to continue the current session.
Attributes:
text: The text the TTS should say to start this additional request of the session.
intent_filter: A list of intents names to restrict the NLU resolution on the
answer of this query.
custom_data: An update to the session's custom data. If not provided, the custom data
will stay the same.
send_intent_not_recognized: Indicates whether the dialogue manager should handle non recognized
intents by itself or send them for the client to handle.
"""
custom_data: Optional[str] = None
text: Optional[str] = None
intent_filter: Optional[List[str]] = None
send_intent_not_recognized: bool = False
[docs]@dataclass
class EndSession:
"""Helper class to end the current session.
Attributes:
text: The text the TTS should say to end the session.
custom_data: An update to the session's custom data. If not provided, the custom data
will stay the same.
"""
text: Optional[str] = None
custom_data: Optional[str] = None
[docs]@dataclass
class TopicData:
"""Helper class for topic subscription.
Attributes:
topic: The MQTT topic.
data: A dictionary holding extracted data for the given placeholder.
"""
topic: str
data: Dict[str, str]
[docs]class HermesApp(HermesClient):
"""A Rhasspy app using the Hermes protocol.
Attributes:
args: Command-line arguments for the Hermes app.
Example:
.. literalinclude:: ../examples/time_app.py
"""
[docs] def __init__(
self,
name: str,
parser: Optional[argparse.ArgumentParser] = None,
mqtt_client: Optional[mqtt.Client] = None,
**kwargs
):
"""Initialize the Rhasspy Hermes app.
Arguments:
name: The name of this object.
parser: An argument parser.
If the argument is not specified, the object creates an
argument parser itself.
mqtt_client: An MQTT client. If the argument
is not specified, the object creates an MQTT client itself.
**kwargs: Other arguments. This supports the same arguments as the command-line
arguments, such has ``host`` and ``port``. Arguments specified by the user
on the command line have precedence over arguments passed as ``**kwargs``.
"""
if parser is None:
parser = argparse.ArgumentParser(prog=name)
# Add default arguments
hermes_cli.add_hermes_args(parser)
# overwrite argument defaults inside parser with argparse.SUPPRESS
# so arguments that are not provided get ignored
suppress_parser = deepcopy(parser)
for action in suppress_parser._actions:
action.default = argparse.SUPPRESS
supplied_args = vars(suppress_parser.parse_args())
default_args = vars(parser.parse_args([]))
# Command-line arguments take precedence over the arguments of the HermesApp.__init__
args = {**default_args, **kwargs, **supplied_args}
self.args = argparse.Namespace(**args)
# Set up logging
hermes_cli.setup_logging(self.args)
_LOGGER.debug(self.args)
# Create MQTT client
if mqtt_client is None:
mqtt_client = mqtt.Client()
# Initialize HermesClient
# pylint: disable=no-member
super().__init__(name, mqtt_client, site_ids=self.args.site_id)
self._callbacks_hotword: List[Callable[[HotwordDetected], Awaitable[None]]] = []
self._callbacks_intent: Dict[
str,
List[Callable[[NluIntent], Awaitable[None]]],
] = {}
self._callbacks_intent_not_recognized: List[
Callable[[NluIntentNotRecognized], Awaitable[None]]
] = []
self._callbacks_dialogue_intent_not_recognized: List[
Callable[[DialogueIntentNotRecognized], Awaitable[None]]
] = []
self._callbacks_topic: Dict[
str, List[Callable[[TopicData, bytes], Awaitable[None]]]
] = {}
self._callbacks_topic_regex: List[
Callable[[TopicData, bytes], Awaitable[None]]
] = []
self._additional_topic: List[str] = []
def _subscribe_callbacks(self) -> None:
# Remove duplicate intent names
intent_names: List[str] = list(set(self._callbacks_intent.keys()))
topics: List[str] = [
NluIntent.topic(intent_name=intent_name) for intent_name in intent_names
]
if self._callbacks_hotword:
topics.append(HotwordDetected.topic())
if self._callbacks_intent_not_recognized:
topics.append(NluIntentNotRecognized.topic())
if self._callbacks_dialogue_intent_not_recognized:
topics.append(DialogueIntentNotRecognized.topic())
topic_names: List[str] = list(set(self._callbacks_topic.keys()))
topics.extend(topic_names)
topics.extend(self._additional_topic)
self.subscribe_topics(*topics)
[docs] async def on_raw_message(self, topic: str, payload: bytes):
"""This method handles messages from the MQTT broker.
Arguments:
topic: The topic of the received MQTT message.
payload: The payload of the received MQTT message.
.. warning:: Don't override this method in your app. This is where all the magic happens in Rhasspy Hermes App.
"""
try:
if HotwordDetected.is_topic(topic):
# hermes/hotword/<wakeword_id>/detected
try:
hotword_detected = HotwordDetected.from_json(payload)
for function_h in self._callbacks_hotword:
await function_h(hotword_detected)
except KeyError as key:
_LOGGER.error(
"Missing key %s in JSON payload for %s: %s", key, topic, payload
)
elif NluIntent.is_topic(topic):
# hermes/intent/<intent_name>
try:
nlu_intent = NluIntent.from_json(payload)
intent_name = nlu_intent.intent.intent_name
if intent_name in self._callbacks_intent:
for function_i in self._callbacks_intent[intent_name]:
await function_i(nlu_intent)
except KeyError as key:
_LOGGER.error(
"Missing key %s in JSON payload for %s: %s", key, topic, payload
)
elif NluIntentNotRecognized.is_topic(topic):
# hermes/nlu/intentNotRecognized
try:
nlu_intent_not_recognized = NluIntentNotRecognized.from_json(
payload
)
for function_inr in self._callbacks_intent_not_recognized:
await function_inr(nlu_intent_not_recognized)
except KeyError as key:
_LOGGER.error(
"Missing key %s in JSON payload for %s: %s", key, topic, payload
)
elif DialogueIntentNotRecognized.is_topic(topic):
# hermes/dialogueManager/intentNotRecognized
try:
dialogue_intent_not_recognized = (
DialogueIntentNotRecognized.from_json(payload)
)
for function_dinr in self._callbacks_dialogue_intent_not_recognized:
await function_dinr(dialogue_intent_not_recognized)
except KeyError as key:
_LOGGER.error(
"Missing key %s in JSON payload for %s: %s", key, topic, payload
)
else:
unexpected_topic = True
if topic in self._callbacks_topic:
for function_1 in self._callbacks_topic[topic]:
await function_1(TopicData(topic, {}), payload)
unexpected_topic = False
else:
for function_2 in self._callbacks_topic_regex:
if hasattr(function_2, "topic_extras"):
topic_extras = getattr(function_2, "topic_extras")
for pattern, named_positions in topic_extras:
if re.match(pattern, topic) is not None:
data = TopicData(topic, {})
parts = topic.split(sep="/")
if named_positions is not None:
for name, position in named_positions.items():
data.data[name] = parts[position]
await function_2(data, payload)
unexpected_topic = False
if unexpected_topic:
_LOGGER.warning("Unexpected topic: %s", topic)
except Exception:
_LOGGER.exception("on_raw_message")
[docs] def on_hotword(
self, function: Callable[[HotwordDetected], Awaitable[None]]
) -> Callable[[HotwordDetected], Awaitable[None]]:
"""Apply this decorator to a function that you want to act on a detected hotword.
The decorated function has a :class:`rhasspyhermes.wake.HotwordDetected` object as an argument
and doesn't have a return value.
Example:
.. code-block:: python
@app.on_hotword
async def wake(hotword: HotwordDetected):
print(f"Hotword {hotword.model_id} detected on site {hotword.site_id}")
If a hotword has been detected, the ``wake`` function is called with the ``hotword`` argument.
This object holds information about the detected hotword.
"""
self._callbacks_hotword.append(function)
return function
[docs] def on_intent(
self, *intent_names: str
) -> Callable[
[
Callable[
[NluIntent], Union[Awaitable[ContinueSession], Awaitable[EndSession]]
]
],
Callable[[NluIntent], Awaitable[None]],
]:
"""Apply this decorator to a function that you want to act on a received intent.
Arguments:
intent_names: Names of the intents you want the function to act on.
The decorated function has a :class:`rhasspyhermes.nlu.NluIntent` object as an argument
and needs to return a :class:`ContinueSession` or :class:`EndSession` object.
If the function returns a :class:`ContinueSession` object, the intent's session is continued after
saying the supplied text. If the function returns a a :class:`EndSession` object, the intent's session
is ended after saying the supplied text, or immediately when no text is supplied.
Example:
.. code-block:: python
@app.on_intent("GetTime")
async def get_time(intent: NluIntent):
return EndSession("It's too late.")
If the intent with name GetTime has been detected, the ``get_time`` function is called
with the ``intent`` argument. This object holds information about the detected intent.
"""
def wrapper(
function: Callable[
[NluIntent], Union[Awaitable[ContinueSession], Awaitable[EndSession]]
]
) -> Callable[[NluIntent], Awaitable[None]]:
async def wrapped(intent: NluIntent) -> None:
message = await function(intent)
if isinstance(message, EndSession):
if intent.session_id is not None:
self.publish(
DialogueEndSession(
session_id=intent.session_id,
text=message.text,
custom_data=message.custom_data,
)
)
else:
_LOGGER.error(
"Cannot end session of intent without session ID."
)
elif isinstance(message, ContinueSession):
if intent.session_id is not None:
self.publish(
DialogueContinueSession(
session_id=intent.session_id,
text=message.text,
intent_filter=message.intent_filter,
custom_data=message.custom_data,
send_intent_not_recognized=message.send_intent_not_recognized,
)
)
else:
_LOGGER.error(
"Cannot continue session of intent without session ID."
)
for intent_name in intent_names:
try:
self._callbacks_intent[intent_name].append(wrapped)
except KeyError:
self._callbacks_intent[intent_name] = [wrapped]
return wrapped
return wrapper
[docs] def on_intent_not_recognized(
self,
function: Callable[
[NluIntentNotRecognized],
Union[Awaitable[ContinueSession], Awaitable[EndSession], Awaitable[None]],
],
) -> Callable[[NluIntentNotRecognized], Awaitable[None]]:
"""Apply this decorator to a function that you want to act when the NLU system
hasn't recognized an intent.
The decorated function has a :class:`rhasspyhermes.nlu.NluIntentNotRecognized` object as an argument
and can return a :class:`ContinueSession` or :class:`EndSession` object or have no return value.
If the function returns a :class:`ContinueSession` object, the current session is continued after
saying the supplied text. If the function returns a a :class:`EndSession` object, the current session
is ended after saying the supplied text, or immediately when no text is supplied. If the function doesn't
have a return value, nothing is changed to the session.
Example:
.. code-block:: python
@app.on_intent_not_recognized
async def not_understood(intent_not_recognized: NluIntentNotRecognized):
print(f"Didn't understand \"{intent_not_recognized.input}\" on site {intent_not_recognized.site_id}")
If an intent hasn't been recognized, the ``not_understood`` function is called
with the ``intent_not_recognized`` argument. This object holds information about the not recognized intent.
"""
async def wrapped(inr: NluIntentNotRecognized) -> None:
message = await function(inr)
if isinstance(message, EndSession):
if inr.session_id is not None:
self.publish(
DialogueEndSession(
session_id=inr.session_id,
text=message.text,
custom_data=message.custom_data,
)
)
else:
_LOGGER.error(
"Cannot end session of NLU intent not recognized message without session ID."
)
elif isinstance(message, ContinueSession):
if inr.session_id is not None:
self.publish(
DialogueContinueSession(
session_id=inr.session_id,
text=message.text,
intent_filter=message.intent_filter,
custom_data=message.custom_data,
send_intent_not_recognized=message.send_intent_not_recognized,
)
)
else:
_LOGGER.error(
"Cannot continue session of NLU intent not recognized message without session ID."
)
self._callbacks_intent_not_recognized.append(wrapped)
return wrapped
[docs] def on_dialogue_intent_not_recognized(
self,
function: Callable[
[DialogueIntentNotRecognized],
Union[Awaitable[ContinueSession], Awaitable[EndSession], Awaitable[None]],
],
) -> Callable[[DialogueIntentNotRecognized], Awaitable[None]]:
"""Apply this decorator to a function that you want to act when the dialogue manager
failed to recognize an intent and you requested to notify you of this event with the
`sendIntentNotRecognized` flag.
The decorated function has a :class:`rhasspyhermes.dialogue.DialogueIntentNotRecognized` object as an argument
and can return a :class:`ContinueSession` or :class:`EndSession` object or have no return value.
If the function returns a :class:`ContinueSession` object, the current session is continued after
saying the supplied text. If the function returns a a :class:`EndSession` object, the current session
is ended after saying the supplied text, or immediately when no text is supplied. If the function doesn't
have a return value, nothing is changed to the session.
Example:
.. code-block:: python
@app.on_dialogue_intent_not_recognized
async def not_understood(intent_not_recognized: DialogueIntentNotRecognized):
print(f"Didn't understand \"{intent_not_recognized.input}\" on site {intent_not_recognized.site_id}")
If an intent hasn't been recognized, the ``not_understood`` function is called
with the ``intent_not_recognized`` argument. This object holds information about the not recognized intent.
"""
async def wrapped(inr: DialogueIntentNotRecognized) -> None:
message = await function(inr)
if isinstance(message, EndSession):
if inr.session_id is not None:
self.publish(
DialogueEndSession(
session_id=inr.session_id,
text=message.text,
custom_data=message.custom_data,
)
)
else:
_LOGGER.error(
"Cannot end session of dialogue intent not recognized message without session ID."
)
elif isinstance(message, ContinueSession):
if inr.session_id is not None:
self.publish(
DialogueContinueSession(
session_id=inr.session_id,
text=message.text,
intent_filter=message.intent_filter,
custom_data=message.custom_data,
send_intent_not_recognized=message.send_intent_not_recognized,
)
)
else:
_LOGGER.error(
"Cannot continue session of dialogue intent not recognized message without session ID."
)
self._callbacks_dialogue_intent_not_recognized.append(wrapped)
return wrapped
[docs] def on_topic(self, *topic_names: str):
"""Apply this decorator to a function that you want to act on a received raw MQTT message.
Arguments:
topic_names: The MQTT topics you want the function to act on.
The decorated function has a :class:`TopicData` and a :class:`bytes` object as its arguments.
The former holds data about the topic and the latter about the payload of the MQTT message.
Example:
.. code-block:: python
@app.on_topic("hermes/+/{site_id}/playBytes/#")
async def test_topic1(data: TopicData, payload: bytes):
_LOGGER.debug("topic: %s, site_id: %s", data.topic, data.data.get("site_id"))
.. note:: The topic names can contain MQTT wildcards (`+` and `#`) or templates (`{foobar}`).
In the latter case, the value of the named template is available in the decorated function
as part of the :class:`TopicData` argument.
"""
def wrapper(function):
async def wrapped(data: TopicData, payload: bytes):
await function(data, payload)
replaced_topic_names = []
for topic_name in topic_names:
named_positions = {}
parts = topic_name.split(sep="/")
length = len(parts) - 1
def placeholder_mapper(part):
i, token = tuple(part)
if token.startswith("{") and token.endswith("}"):
named_positions[token[1:-1]] = i
return "+"
return token
parts = list(map(placeholder_mapper, enumerate(parts)))
replaced_topic_name = "/".join(parts)
def regex_mapper(part):
i, token = tuple(part)
value = token
if i == 0:
value = (
"^[^+#/]"
if token == "+"
else "[^/]+"
if length == 0 and token == "#"
else "^" + token
)
elif i < length:
value = "[^/]+" if token == "+" else token
elif i == length:
value = (
"[^/]+"
if token == "#"
else "[^/]+$"
if token == "+"
else token.replace("*", r"\*")
+ "$" # not escaping * prevents the regex from matching
)
return value
pattern = "/".join(map(regex_mapper, enumerate(parts)))
if topic_name == pattern[1:-1]:
try:
self._callbacks_topic[topic_name].append(wrapped)
except KeyError:
self._callbacks_topic[topic_name] = [wrapped]
else:
replaced_topic_names.append(replaced_topic_name)
if not hasattr(wrapped, "topic_extras"):
wrapped.topic_extras = []
wrapped.topic_extras.append(
(
re.compile(pattern),
named_positions if len(named_positions) > 0 else None,
)
)
if hasattr(wrapped, "topic_extras"):
self._callbacks_topic_regex.append(wrapped)
self._additional_topic.extend(replaced_topic_names)
return wrapped
return wrapper
[docs] def run(self):
"""Run the app. This method:
- subscribes to all MQTT topics for the functions you decorated;
- connects to the MQTT broker;
- starts the MQTT event loop and reacts to received MQTT messages.
"""
# Subscribe to callbacks
self._subscribe_callbacks()
# Try to connect
# pylint: disable=no-member
_LOGGER.debug("Connecting to %s:%s", self.args.host, self.args.port)
hermes_cli.connect(self.mqtt_client, self.args)
self.mqtt_client.loop_start()
try:
# Run main loop
asyncio.run(self.handle_messages_async())
except KeyboardInterrupt:
pass
finally:
self.mqtt_client.loop_stop()
[docs] def notify(self, text: str, site_id: str = "default"):
"""Send a dialogue notification.
Use this to inform the user of something without expecting a response.
Arguments:
text: The text to say.
site_id: The ID of the site where the text should be said.
"""
notification = DialogueNotification(text)
self.publish(DialogueStartSession(init=notification, site_id=site_id))