How to Build a Simple ActivityPub Reminder Bot in Python
-
This tutorial will guide you through building a simple ActivityPub bot using Python. The bot will listen for mentions and, when it receives a message in a specific format, it will schedule and send a reminder back to the user after a specified delay.
For example, if a user mentions the bot with a message like "
@reminder@your.host.com 10m check the oven
", the bot will reply 10 minutes later with a message like "🔔 Reminder for @user: check the oven
".Prerequisites
To follow this tutorial, you will need Python 3.10+ and the following libraries:
- apkit[server]: A powerful toolkit for building ActivityPub applications in Python. We use the
server
extra, which includes FastAPI-based components. - uvicorn: An ASGI server to run our FastAPI application.
- cryptography: Used for generating and managing the cryptographic keys required for ActivityPub.
- uv: An optional but recommended fast package manager.
You can install these dependencies using
uv
orpip
.# Initialize a new project with uv uv init # Install dependencies uv add "apkit[server]" uvicorn cryptography
Project Structure
The project structure is minimal, consisting of a single Python file for our bot's logic.
. ├── main.py └── private_key.pem
main.py
: Contains all the code for the bot.private_key.pem
: The private key for the bot's Actor. This will be generated automatically on the first run.
Code Walkthrough
Our application logic can be broken down into the following steps:
- Imports and Configuration: Set up necessary imports and basic configuration variables.
- Key Generation: Prepare the cryptographic keys needed for signing activities.
- Actor Definition: Define the bot's identity on the Fediverse.
- Server Initialization: Set up the
apkit
ActivityPub server. - Data Storage: Implement a simple in-memory store for created activities.
- Reminder Logic: Code the core logic for parsing reminders and sending notifications.
- Endpoint Definitions: Create the necessary web endpoints (
/actor
,/inbox
, etc.). - Activity Handlers: Process incoming activities from other servers.
- Application Startup: Run the server.
Let's dive into each section of the
main.py
file.1. Imports and Configuration
First, we import the necessary modules and define the basic configuration for our bot.
# main.py import asyncio import logging import re import uuid import os from datetime import timedelta, datetime # Imports from FastAPI, cryptography, and apkit from fastapi import Request, Response from fastapi.responses import JSONResponse from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import serialization as crypto_serialization from apkit.config import AppConfig from apkit.server import ActivityPubServer from apkit.server.types import Context, ActorKey from apkit.server.responses import ActivityResponse from apkit.models import ( Actor, Application, CryptographicKey, Follow, Create, Note, Mention, Actor as APKitActor, OrderedCollection, ) from apkit.client import WebfingerResource, WebfingerResult, WebfingerLink from apkit.client.asyncio.client import ActivityPubClient # --- Logging Setup --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --- Basic Configuration --- HOST = "your.host.com" # Replace with your domain USER_ID = "reminder" # The bot's username
Make sure to replace
your.host.com
with the actual domain where your bot will be hosted. These values determine your bot's unique identifier (e.g.,@reminder@your.host.com
).2. Key Generation and Persistence
ActivityPub uses HTTP Signatures to secure communication between servers. This requires each actor to have a public/private key pair. The following code generates a private key and saves it to a file if one doesn't already exist.
# main.py (continued) # --- Key Persistence --- KEY_FILE = "private_key.pem" # Load the private key if it exists, otherwise generate a new one if os.path.exists(KEY_FILE): logger.info(f"Loading existing private key from {KEY_FILE}.") with open(KEY_FILE, "rb") as f: private_key = crypto_serialization.load_pem_private_key(f.read(), password=None) else: logger.info(f"No key file found. Generating new private key and saving to {KEY_FILE}.") private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) with open(KEY_FILE, "wb") as f: f.write(private_key.private_bytes( encoding=crypto_serialization.Encoding.PEM, format=crypto_serialization.PrivateFormat.PKCS8, encryption_algorithm=crypto_serialization.NoEncryption() )) # Generate the public key from the private key public_key_pem = private_key.public_key().public_bytes( encoding=crypto_serialization.Encoding.PEM, format=crypto_serialization.PublicFormat.SubjectPublicKeyInfo ).decode('utf-8')
3. Actor Definition
Next, we define the bot's Actor. The Actor is the bot's identity in the ActivityPub network. We use the
Application
type, as this entity is automated.# main.py (continued) # --- Actor Definition --- actor = Application( id=f"https://{HOST}/actor", name="Reminder Bot", preferredUsername=USER_ID, summary="A bot that sends you reminders. Mention me like: @reminder 5m Check the oven", inbox=f"https://{HOST}/inbox", # Endpoint for receiving activities outbox=f"https://{HOST}/outbox", # Endpoint for sending activities publicKey=CryptographicKey( id=f"https://{HOST}/actor#main-key", owner=f"https://{HOST}/actor", publicKeyPem=public_key_pem ) )
4. Server Initialization
We initialize the
ActivityPubServer
fromapkit
, providing it with a function to retrieve our Actor's keys for signing outgoing activities.# main.py (continued) # --- Key Retrieval Function --- async def get_keys_for_actor(identifier: str) -> list[ActorKey]: """Returns the key for a given Actor ID.""" if identifier == actor.id: return [ActorKey(key_id=actor.publicKey.id, private_key=private_key)] return [] # --- Server Initialization --- app = ActivityPubServer(apkit_config=AppConfig( actor_keys=get_keys_for_actor # Register the key retrieval function ))
5. In-Memory Storage and Cache
To serve created activities, we need to store them somewhere. For simplicity, this example uses a basic in-memory dictionary as a store and a cache. In a production application, you would replace this with a persistent database (like SQLite or PostgreSQL) and a proper cache (like Redis).
# main.py (continued) # --- In-memory Store and Cache --- ACTIVITY_STORE = {} # A simple dict to store created activities CACHE = {} # A cache for recently accessed activities CACHE_TTL = timedelta(minutes=5) # Cache expiration time (5 minutes)
6. Reminder Parsing and Sending Logic
This is the core logic of our bot. The
parse_reminder
function uses a regular expression to extract the delay and message from a mention, andsend_reminder
schedules the notification.# main.py (continued) # --- Reminder Parsing Logic --- def parse_reminder(text: str) -> tuple[timedelta | None, str | None, str | None]: """Parses reminder text like '5m do something'.""" # ... (implementation omitted for brevity) # --- Reminder Sending Function --- async def send_reminder(ctx: Context, delay: timedelta, message: str, target_actor: APKitActor, original_note: Note): """Waits for a specified delay and then sends a reminder.""" logger.info(f"Scheduling reminder for {target_actor.id} in {delay}: '{message}'") await asyncio.sleep(delay.total_seconds()) # Asynchronously wait logger.info(f"Sending reminder to {target_actor.id}") # Create the reminder Note reminder_note = Note(...) # Wrap it in a Create activity reminder_create = Create(...) # Store the created activities ACTIVITY_STORE[reminder_note.id] = reminder_note ACTIVITY_STORE[reminder_create.id] = reminder_create # Send the activity to the target actor's inbox keys = await get_keys_for_actor(f"https://{HOST}/actor") await ctx.send(keys, target_actor, reminder_create) logger.info(f"Reminder sent to {target_actor.id}")
7. Endpoint Definitions
We define the required ActivityPub endpoints. Since
apkit
is built on FastAPI, we can use standard FastAPI decorators. The main endpoints are:- Webfinger: Allows users on other servers to discover the bot using an address like
@user@host
. This is a crucial first step for federation. - /actor: Serves the bot's Actor object, which contains its profile information and public key.
- /inbox: The endpoint where the bot receives activities from other servers.
apkit
handles this route automatically, directing activities to the handlers we'll define in the next step. - /outbox: A collection of the activities created by the bot. but this returns placeholder collection.
- /notes/{note_id} and /creates/{create_id}: Endpoints to serve specific objects created by the bot, allowing other servers to fetch them by their unique ID.
Here is the code for defining these endpoints:
# main.py (continued) # The inbox endpoint is handled by apkit automatically. app.inbox("/inbox") @app.webfinger() async def webfinger_endpoint(request: Request, acct: WebfingerResource) -> Response: """Handles Webfinger requests to make the bot discoverable.""" if not acct.url: # Handle resource queries like acct:user@host if acct.username == USER_ID and acct.host == HOST: link = WebfingerLink(rel="self", type="application/activity+json", href=actor.id) wf_result = WebfingerResult(subject=acct, links=[link]) return JSONResponse(wf_result.to_json(), media_type="application/jrd+json") else: # Handle resource queries using a URL if acct.url == f"https://{HOST}/actor": link = WebfingerLink(rel="self", type="application/activity+json", href=actor.id) wf_result = WebfingerResult(subject=acct, links=[link]) return JSONResponse(wf_result.to_json(), media_type="application/jrd+json") return JSONResponse({"message": "Not Found"}, status_code=404) @app.get("/actor") async def get_actor_endpoint(): """Serves the bot's Actor object.""" return ActivityResponse(actor) @app.get("/outbox") async def get_outbox_endpoint(): """Serves a collection of the bot's sent activities.""" items = sorted(ACTIVITY_STORE.values(), key=lambda x: x.id, reverse=True) outbox_collection = OrderedCollection( id=actor.outbox, totalItems=len(items), orderedItems=items ) return ActivityResponse(outbox_collection) @app.get("/notes/{note_id}") async def get_note_endpoint(note_id: uuid.UUID): """Serves a specific Note object, with caching.""" note_uri = f"https://{HOST}/notes/{note_id}" # Check cache first if note_uri in CACHE and (datetime.now() - CACHE[note_uri]["timestamp"]) < CACHE_TTL: return ActivityResponse(CACHE[note_uri]["activity"]) # If not in cache, get from store if note_uri in ACTIVITY_STORE: activity = ACTIVITY_STORE[note_uri] # Add to cache before returning CACHE[note_uri] = {"activity": activity, "timestamp": datetime.now()} return ActivityResponse(activity) return Response(status_code=404) # Not Found @app.get("/creates/{create_id}") async def get_create_endpoint(create_id: uuid.UUID): """Serves a specific Create activity, with caching.""" create_uri = f"https://{HOST}/creates/{create_id}" if create_uri in CACHE and (datetime.now() - CACHE[create_uri]["timestamp"]) < CACHE_TTL: return ActivityResponse(CACHE[create_uri]["activity"]) if create_uri in ACTIVITY_STORE: activity = ACTIVITY_STORE[create_uri] CACHE[create_uri] = {"activity": activity, "timestamp": datetime.now()} return ActivityResponse(activity) return Response(status_code=404)
8. Activity Handlers
We use the
@app.on()
decorator to define handlers for specific activity types posted to our inbox.- on_follow_activity: Automatically accepts
Follow
requests. - on_create_activity: Parses incoming
Create
activities (specifically forNote
objects) to schedule reminders.
# main.py (continued) # Handler for Follow activities @app.on(Follow) async def on_follow_activity(ctx: Context): """Automatically accepts follow requests.""" # ... (implementation omitted for brevity) # Handler for Create activities @app.on(Create) async def on_create_activity(ctx: Context): """Parses mentions to schedule reminders.""" activity = ctx.activity # Ignore if it's not a Note if not (isinstance(activity, Create) and isinstance(activity.object, Note)): return Response(status_code=202) note = activity.object # Check if the bot was mentioned is_mentioned = any( isinstance(tag, Mention) and tag.href == actor.id for tag in (note.tag or []) ) if not is_mentioned: return Response(status_code=202) # ... (Parse reminder text) delay, message, time_str = parse_reminder(command_text) # If parsing is successful, schedule the reminder as a background task if delay and message and sender_actor: asyncio.create_task(send_reminder(ctx, delay, message, sender_actor, note)) reply_content = f"<p>✅ OK! I will remind you in {time_str}.</p>" else: # If parsing fails, send usage instructions reply_content = "<p>🤔 Sorry, I didn\'t understand. Please use the format: `@reminder [time] [message]`.</p><p>Example: `@reminder 10m Check the oven`</p>" # ... (Create and send the reply Note)
9. Running the Application
Finally, we run the application using
uvicorn
.# main.py (continued) if __name__ == "__main__": import uvicorn logger.info("Starting uvicorn server...") uvicorn.run(app, host="0.0.0.0", port=8000)
How to Run the Bot
-
Set the
HOST
andUSER_ID
variables inmain.py
to match your environment. -
Run the server from your terminal:
uvicorn main:app --host 0.0.0.0 --port 8000
-
Your bot will be running at
http://0.0.0.0:8000
.
Now you can mention your bot from anywhere in the Fediverse (e.g.,
@reminder@your.host.com
) to set a reminder.Next Steps
This tutorial covers the basics of creating a simple ActivityPub bot. Since it only uses in-memory storage, all reminders will be lost on server restart. Here are some potential improvements:
- Persistent Storage: Replace the in-memory
ACTIVITY_STORE
with a database like SQLite or PostgreSQL. - Robust Task Queuing: Use a dedicated task queue like Celery with a Redis or RabbitMQ broker to ensure reminders are not lost if the server restarts.
- Advanced Commands: Add support for more complex commands, such as recurring reminders.
We hope this guide serves as a good starting point for building your own ActivityPub applications!
https://fedi-libs.github.io/apkit/
- apkit[server]: A powerful toolkit for building ActivityPub applications in Python. We use the
-
undefined Evan Prodromou ha condiviso questa discussione
undefined julian ha condiviso questa discussione