Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions examples/python/ess-messages/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Outlook (Microsoft Graph)
# CLIENT_ID: Microsoft Entra app registration (e.g., the "Office Desktop Apps" public client)
CLIENT_ID=
# TENANT_ID: your organization's Microsoft Entra (Azure AD) tenant ID
TENANT_ID=

# Webex OAuth integration (from https://developer.webex.com/my-apps)
WEBEX_CLIENT_ID=
WEBEX_CLIENT_SECRET=

# Or use a personal access token directly (12-hour TTL)
# https://developer.webex.com/docs/getting-your-personal-access-token
# WEBEX_ACCESS_TOKEN=
58 changes: 58 additions & 0 deletions examples/python/ess-messages/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# ess-messages

Unified inbox -- aggregates recent email and Webex messages, sorted by date.

```bash
ess-messages --limit 5
```

```
[W] bob@example.com 2026-04-10 12:55 (General)
Team sync moved to 1pm. Please use the updated meeting link.

[E] Alice Smith 2026-04-10 12:43 (Build Health Alert)
Build health alert: latency exceeded threshold for 5 minutes.

[W] carol@example.com 2026-04-10 12:01 (Slides Space for Monday)
Uploaded the latest deck to the room files tab.

[E] broker-alerts-noreply@example.com 2026-04-10 08:26 (Shares of Restricted Stock Ve...)
Your restricted stock vesting transaction has been processed.
```

Output uses [Activity Streams 2.0](https://www.w3.org/TR/activitystreams-core/) (W3C) format.

## Installation

From the workspace root:

```bash
uv sync --all-packages
```

## Auth

Requires credentials for one or both sources. See:
- [Outlook setup](../../../packages/python/ess-outlook/.env.example) -- `CLIENT_ID`, `TENANT_ID`
- [Webex setup](../../../packages/python/ess-webex/.env.example) -- `WEBEX_CLIENT_ID`/`WEBEX_CLIENT_SECRET` or `WEBEX_ACCESS_TOKEN`

Copy this example's `.env.example` to `.env` and fill in values, or set the
variables in your shell environment. Never commit `.env` -- it is meant to
hold secrets.

## Usage

```bash
ess-messages # Latest 25 from both sources
ess-messages --limit 10 # Limit results
ess-messages --email-only # Only email
ess-messages --webex-only # Only Webex
ess-messages --webex-rooms R1,R2 # Specific Webex rooms
ess-messages --json-output # Activity Streams 2.0 JSON
```

## Running Tests

```bash
uv run pytest examples/python/ess-messages/src/ess_messages/aggregator/test_aggregator.py
```
28 changes: 28 additions & 0 deletions examples/python/ess-messages/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[project]
name = "ess-messages"
version = "0.1.0"
description = "Unified inbox -- aggregates email and Webex messages into Activity Streams 2.0"
readme = "README.md"
requires-python = ">=3.12,<3.13"
dependencies = [
"activitystreams2>=0.5.0",
"click>=8.1",
"ess-outlook",
"ess-webex",
"python-dotenv>=1.0.0",
]

[project.scripts]
ess-messages = "ess_messages.__main__:main"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.uv.sources]
ess-outlook = { workspace = true }
ess-webex = { workspace = true }

[tool.hatch.build.targets.wheel]
packages = ["src/ess_messages"]
exclude = ["**/test_*.py"]
3 changes: 3 additions & 0 deletions examples/python/ess-messages/src/ess_messages/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""ess-messages -- unified inbox for email and Webex."""

__version__ = "0.1.0"
15 changes: 15 additions & 0 deletions examples/python/ess-messages/src/ess_messages/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Entry point for ``python -m ess_messages``."""

from dotenv import load_dotenv

from ess_messages.cli import cli

load_dotenv()


def main() -> None:
cli() # pylint: disable=no-value-for-parameter


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Message aggregation package."""

from .aggregator import MessageAggregator

__all__ = ["MessageAggregator"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""Fetch and merge messages from email and Webex into AS2 activities."""

from __future__ import annotations

from activitystreams2 import Create

from ..normalize import normalize_email, normalize_webex, sort_key


class MessageAggregator:
"""Aggregates messages from Outlook and Webex clients."""

def __init__(self, outlook_client=None, webex_client=None) -> None:
self._outlook = outlook_client
self._webex = webex_client

def get_latest(
self,
*,
limit: int = 25,
webex_room_ids: list[str] | None = None,
email_only: bool = False,
webex_only: bool = False,
) -> list[Create]:
"""Fetch, normalize, merge, and sort messages.

Returns up to *limit* AS2 Create activities, newest first.
"""
activities: list[Create] = []

if not webex_only and self._outlook:
activities.extend(self._fetch_emails(limit))

if not email_only and self._webex:
activities.extend(self._fetch_webex(limit, webex_room_ids))

activities.sort(key=sort_key, reverse=True)
return activities[:limit]

def _fetch_emails(self, limit: int) -> list[Create]:
messages = self._outlook.list_messages(limit=limit)
return [normalize_email(msg) for msg in messages]

def _fetch_webex(self, limit: int, room_ids: list[str] | None) -> list[Create]:
if room_ids is None:
rooms = self._webex.list_rooms(max_results=5)
room_ids = [r["id"] for r in rooms]

activities: list[Create] = []
per_room = max(limit // len(room_ids), 1) if room_ids else 0
for room_id in room_ids:
room = self._webex.get_room(room_id)
room_title = room.get("title", "")
messages = self._webex.list_messages(room_id, max_results=per_room)
activities.extend(normalize_webex(msg, room_title) for msg in messages)
return activities
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# ruff: noqa: PLR2004 -- magic numbers in test assertions are expected literals
"""Tests for MessageAggregator -- mocks both clients."""

from __future__ import annotations

from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock

from ess_messages.aggregator import MessageAggregator

_BASE_TIME = datetime(2026, 4, 10, 12, 0, 0, tzinfo=timezone.utc)


def _email_msg(subject="Test email", minutes_ago=10):
return {
"id": f"email-{minutes_ago}",
"subject": subject,
"sender_name": "Alice",
"sender_address": "alice@example.com",
"date": _BASE_TIME - timedelta(minutes=minutes_ago),
"is_read": True,
}


def _webex_msg(text="Hello", minutes_ago=5):
return {
"id": f"webex-{minutes_ago}",
"room_id": "room-1",
"person_id": "user-1",
"person_email": "bob@example.com",
"text": text,
"markdown": None,
"created": _BASE_TIME - timedelta(minutes=minutes_ago),
}


def _make_aggregator(emails=None, webex_msgs=None):
outlook = MagicMock()
outlook.list_messages.return_value = emails or []

webex = MagicMock()
webex.list_rooms.return_value = [{"id": "room-1"}]
webex.get_room.return_value = {"title": "General"}
webex.list_messages.return_value = webex_msgs or []

return MessageAggregator(outlook_client=outlook, webex_client=webex)


class TestGetLatest:
def test_merges_and_sorts_by_date(self):
aggregator = _make_aggregator(
emails=[_email_msg(minutes_ago=10)],
webex_msgs=[_webex_msg(minutes_ago=5)],
)
results = aggregator.get_latest(limit=10)

assert len(results) == 2
assert results[0]["source"] == "webex"
assert results[1]["source"] == "email"

def test_respects_limit(self):
aggregator = _make_aggregator(
emails=[_email_msg(minutes_ago=i + 1) for i in range(10)],
webex_msgs=[_webex_msg(minutes_ago=i + 1) for i in range(10)],
)
results = aggregator.get_latest(limit=5)

assert len(results) == 5

def test_email_only(self):
aggregator = _make_aggregator(
emails=[_email_msg()],
webex_msgs=[_webex_msg()],
)
results = aggregator.get_latest(email_only=True)

assert len(results) == 1
assert results[0]["source"] == "email"

def test_webex_only(self):
aggregator = _make_aggregator(
emails=[_email_msg()],
webex_msgs=[_webex_msg()],
)
results = aggregator.get_latest(webex_only=True)

assert len(results) == 1
assert results[0]["source"] == "webex"

def test_empty_sources(self):
aggregator = _make_aggregator()
results = aggregator.get_latest()

assert results == []


class TestNormalization:
def test_email_produces_as2(self):
aggregator = _make_aggregator(emails=[_email_msg(subject="Q1 Report")])
results = aggregator.get_latest(email_only=True, limit=1)

activity = results[0]
assert activity["source"] == "email"
assert activity["actor"]["name"] == "Alice"
assert activity["object"]["name"] == "Q1 Report"

def test_webex_produces_as2(self):
aggregator = _make_aggregator(webex_msgs=[_webex_msg(text="Hey team")])
results = aggregator.get_latest(webex_only=True, limit=1)

activity = results[0]
assert activity["source"] == "webex"
assert activity["actor"]["name"] == "bob@example.com"
assert activity["object"]["content"] == "Hey team"
Loading
Loading