aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorFuwn <[email protected]>2025-08-27 17:49:43 -0700
committerFuwn <[email protected]>2025-08-27 17:49:43 -0700
commit6650dcfee1c8aefedf7c9330566c0bb7ecb1a1d3 (patch)
tree0fa54793cc077dc75b5086a2badf00457c6c8b25 /src
downloadumabot-6650dcfee1c8aefedf7c9330566c0bb7ecb1a1d3.tar.xz
umabot-6650dcfee1c8aefedf7c9330566c0bb7ecb1a1d3.zip
feat: Initial commit
Diffstat (limited to 'src')
-rw-r--r--src/umabot/__init__.py13
-rw-r--r--src/umabot/__main__.py6
-rw-r--r--src/umabot/bot.py136
-rw-r--r--src/umabot/cli.py107
-rw-r--r--src/umabot/config.py72
-rw-r--r--src/umabot/rules/__init__.py7
-rw-r--r--src/umabot/rules/base.py58
-rw-r--r--src/umabot/rules/example_rule.py56
-rw-r--r--src/umabot/rules/roleplay_limiter.py60
-rw-r--r--src/umabot/rules/spam_detector.py63
10 files changed, 578 insertions, 0 deletions
diff --git a/src/umabot/__init__.py b/src/umabot/__init__.py
new file mode 100644
index 0000000..1b6b859
--- /dev/null
+++ b/src/umabot/__init__.py
@@ -0,0 +1,13 @@
+"""UmaBot - A modular Reddit bot for post moderation."""
+
+from .bot import UmaBot
+from .config import Config
+
+__version__ = "0.1.0"
+__all__ = ["UmaBot", "Config"]
+
+
+def main():
+ """Main entry point for the bot."""
+ from .cli import run_bot
+ run_bot()
diff --git a/src/umabot/__main__.py b/src/umabot/__main__.py
new file mode 100644
index 0000000..61daa90
--- /dev/null
+++ b/src/umabot/__main__.py
@@ -0,0 +1,6 @@
+"""Main module entry point for UmaBot."""
+
+from .cli import run_bot
+
+if __name__ == "__main__":
+ run_bot()
diff --git a/src/umabot/bot.py b/src/umabot/bot.py
new file mode 100644
index 0000000..3db29e9
--- /dev/null
+++ b/src/umabot/bot.py
@@ -0,0 +1,136 @@
+"""Main bot class for UmaBot."""
+
+import time
+import praw
+from typing import List
+from loguru import logger
+
+from .config import Config
+from .rules import SpamDetector, RoleplayLimiter
+
+
+class UmaBot:
+ """Main Reddit bot class."""
+
+ def __init__(self, config: Config):
+ """Initialize the bot with configuration."""
+ self.config = config
+ self.logger = logger.bind(bot="UmaBot")
+
+ # Initialize Reddit client
+ self.reddit = praw.Reddit(
+ client_id=config.client_id,
+ client_secret=config.client_secret,
+ username=config.username,
+ password=config.password,
+ user_agent=config.user_agent
+ )
+
+ # Get subreddit
+ self.subreddit = self.reddit.subreddit(config.subreddit_name)
+
+ # Initialize rules
+ self.rules = [
+ SpamDetector(config),
+ RoleplayLimiter(config)
+ ]
+
+ # Track processed submissions to avoid processing old posts
+ self.processed_submissions = set()
+ self.initialized = False
+
+ self.logger.info(f"Bot initialized for r/{config.subreddit_name}")
+
+ def run(self):
+ """Run the bot continuously."""
+ self.logger.info("Starting UmaBot...")
+
+ try:
+ while True:
+ self._check_new_posts()
+ time.sleep(self.config.check_interval)
+
+ except KeyboardInterrupt:
+ self.logger.info("Bot stopped by user")
+ except Exception as e:
+ self.logger.error(f"Bot crashed: {e}")
+ raise
+
+ def _check_new_posts(self):
+ """Check for new posts and apply rules."""
+ try:
+ # Get new submissions
+ new_submissions = list(self.subreddit.new(limit=25))
+
+ if not new_submissions:
+ self.logger.debug("No new submissions found")
+ return
+
+ # On first run, just populate the processed set without processing
+ if not self.initialized:
+ self.logger.info("Initializing bot - marking existing posts as processed")
+ for submission in new_submissions:
+ self.processed_submissions.add(submission.id)
+ self.initialized = True
+ self.logger.info(f"Bot initialized with {len(self.processed_submissions)} existing posts marked as processed")
+ return
+
+ # Filter out already processed submissions
+ truly_new_submissions = []
+ for submission in new_submissions:
+ if submission.id not in self.processed_submissions:
+ truly_new_submissions.append(submission)
+ self.processed_submissions.add(submission.id)
+
+ if not truly_new_submissions:
+ self.logger.debug("No truly new submissions found")
+ return
+
+ self.logger.info(f"Processing {len(truly_new_submissions)} new submissions")
+
+ # Apply rules to each new submission
+ for submission in truly_new_submissions:
+ self._apply_rules(submission)
+
+ # Clean up old processed submissions periodically
+ self._cleanup_processed_submissions()
+
+ except Exception as e:
+ self.logger.error(f"Error checking new posts: {e}")
+
+ def _apply_rules(self, submission):
+ """Apply all rules to a submission."""
+ try:
+ # Apply each rule
+ for rule in self.rules:
+ if rule.execute(submission):
+ # If a rule removes the submission, stop applying other rules
+ break
+
+ except Exception as e:
+ self.logger.error(f"Error applying rules to {submission.id}: {e}")
+
+ def add_rule(self, rule):
+ """Add a new rule to the bot."""
+ self.rules.append(rule)
+ self.logger.info(f"Added new rule: {rule.__class__.__name__}")
+
+ def test_connection(self):
+ """Test the Reddit API connection."""
+ try:
+ # Try to access the subreddit
+ subreddit_name = self.subreddit.display_name
+ self.logger.info(f"Successfully connected to r/{subreddit_name}")
+ return True
+ except Exception as e:
+ self.logger.error(f"Failed to connect to Reddit: {e}")
+ return False
+
+ def _cleanup_processed_submissions(self):
+ """Clean up old processed submission IDs to prevent memory bloat."""
+ # Keep only the last 1000 processed submissions
+ if len(self.processed_submissions) > 1000:
+ # Convert to list, keep last 1000, convert back to set
+ submissions_list = list(self.processed_submissions)
+ self.processed_submissions = set(submissions_list[-1000:])
+ self.logger.debug(f"Cleaned up processed submissions, keeping {len(self.processed_submissions)} most recent")
diff --git a/src/umabot/cli.py b/src/umabot/cli.py
new file mode 100644
index 0000000..29999ec
--- /dev/null
+++ b/src/umabot/cli.py
@@ -0,0 +1,107 @@
+"""Command-line interface for UmaBot."""
+
+import sys
+import argparse
+from loguru import logger
+
+from .config import Config
+from .bot import UmaBot
+
+
+def setup_logging(verbose: bool = False):
+ """Setup logging configuration."""
+ log_level = "DEBUG" if verbose else "INFO"
+
+ logger.remove() # Remove default handler
+ logger.add(
+ sys.stderr,
+ level=log_level,
+ format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"
+ )
+
+ # Also log to file
+ logger.add(
+ "umabot.log",
+ rotation="1 day",
+ retention="7 days",
+ level=log_level,
+ format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}"
+ )
+
+
+def run_bot():
+ """Main entry point for the bot."""
+ parser = argparse.ArgumentParser(
+ description="UmaBot - A modular Reddit bot for post moderation"
+ )
+
+ parser.add_argument(
+ "--verbose", "-v",
+ action="store_true",
+ help="Enable verbose logging"
+ )
+
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="Run in dry-run mode (don't actually remove posts)"
+ )
+
+ parser.add_argument(
+ "--test",
+ action="store_true",
+ help="Test Reddit connection and exit"
+ )
+
+ args = parser.parse_args()
+
+ # Setup logging
+ setup_logging(args.verbose)
+
+ try:
+ # Load configuration
+ config = Config.from_env()
+
+ # Override dry-run if specified
+ if args.dry_run:
+ config.dry_run = True
+
+ # Validate configuration
+ config.validate()
+
+ # Create bot
+ bot = UmaBot(config)
+
+ # Test connection if requested
+ if args.test:
+ if bot.test_connection():
+ logger.info("Connection test successful!")
+ return 0
+ else:
+ logger.error("Connection test failed!")
+ return 1
+
+ # Run the bot
+ bot.run()
+
+ except ValueError as e:
+ logger.error(f"Configuration error: {e}")
+ logger.info("Please check your environment variables:")
+ logger.info(" REDDIT_CLIENT_ID")
+ logger.info(" REDDIT_CLIENT_SECRET")
+ logger.info(" REDDIT_USERNAME")
+ logger.info(" REDDIT_PASSWORD")
+ logger.info(" SUBREDDIT_NAME")
+ return 1
+
+ except KeyboardInterrupt:
+ logger.info("Bot stopped by user")
+ return 0
+
+ except Exception as e:
+ logger.error(f"Unexpected error: {e}")
+ return 1
+
+
+if __name__ == "__main__":
+ sys.exit(run_bot())
diff --git a/src/umabot/config.py b/src/umabot/config.py
new file mode 100644
index 0000000..93ebf54
--- /dev/null
+++ b/src/umabot/config.py
@@ -0,0 +1,72 @@
+"""Configuration management for UmaBot."""
+
+import os
+from dataclasses import dataclass
+from typing import Optional
+from dotenv import load_dotenv
+
+load_dotenv()
+
+
+@dataclass
+class Config:
+ """Configuration for the Reddit bot."""
+
+ # Reddit API credentials
+ client_id: str
+ client_secret: str
+ username: str
+ password: str
+ user_agent: str
+
+ # Subreddit configuration
+ subreddit_name: str
+
+ # Bot messages
+ spam_message: str
+ roleplay_message: str
+
+ # Bot settings
+ check_interval: int = 60 # seconds
+ max_posts_per_day: int = 3
+ dry_run: bool = False
+
+ @classmethod
+ def from_env(cls) -> "Config":
+ """Create configuration from environment variables."""
+ return cls(
+ client_id=os.getenv("REDDIT_CLIENT_ID", ""),
+ client_secret=os.getenv("REDDIT_CLIENT_SECRET", ""),
+ username=os.getenv("REDDIT_USERNAME", ""),
+ password=os.getenv("REDDIT_PASSWORD", ""),
+ user_agent=os.getenv("REDDIT_USER_AGENT", "UmaBot/0.1.0"),
+ subreddit_name=os.getenv("SUBREDDIT_NAME", ""),
+ spam_message=os.getenv(
+ "SPAM_MESSAGE",
+ "Your post has been removed for posting too frequently. Please wait before posting again."
+ ),
+ roleplay_message=os.getenv(
+ "ROLEPLAY_MESSAGE",
+ "Your post has been removed. Only one roleplay post is allowed per user."
+ ),
+ check_interval=int(os.getenv("CHECK_INTERVAL", "60")),
+ max_posts_per_day=int(os.getenv("MAX_POSTS_PER_DAY", "3")),
+ dry_run=os.getenv("DRY_RUN", "false").lower() == "true",
+ )
+
+ def validate(self) -> None:
+ """Validate that all required configuration is present."""
+ required_fields = [
+ "client_id", "client_secret", "username",
+ "password", "subreddit_name"
+ ]
+
+ missing_fields = []
+ for field in required_fields:
+ if not getattr(self, field):
+ missing_fields.append(field)
+
+ if missing_fields:
+ raise ValueError(
+ f"Missing required configuration: {', '.join(missing_fields)}"
+ )
diff --git a/src/umabot/rules/__init__.py b/src/umabot/rules/__init__.py
new file mode 100644
index 0000000..e912e70
--- /dev/null
+++ b/src/umabot/rules/__init__.py
@@ -0,0 +1,7 @@
+"""Moderation rules for UmaBot."""
+
+from .base import Rule
+from .spam_detector import SpamDetector
+from .roleplay_limiter import RoleplayLimiter
+
+__all__ = ["Rule", "SpamDetector", "RoleplayLimiter"]
diff --git a/src/umabot/rules/base.py b/src/umabot/rules/base.py
new file mode 100644
index 0000000..0318aaa
--- /dev/null
+++ b/src/umabot/rules/base.py
@@ -0,0 +1,58 @@
+"""Base rule class for moderation rules."""
+
+from abc import ABC, abstractmethod
+from typing import List, Optional
+import praw.models
+from loguru import logger
+
+
+class Rule(ABC):
+ """Base class for all moderation rules."""
+
+ def __init__(self, config):
+ """Initialize the rule with configuration."""
+ self.config = config
+ self.logger = logger.bind(rule=self.__class__.__name__)
+
+ @abstractmethod
+ def should_remove(self, submission: praw.models.Submission) -> bool:
+ """Determine if a submission should be removed."""
+ pass
+
+ @abstractmethod
+ def get_removal_message(self, submission: praw.models.Submission) -> str:
+ """Get the message to post when removing a submission."""
+ pass
+
+ def execute(self, submission: praw.models.Submission) -> bool:
+ """Execute the rule on a submission.
+
+ Returns:
+ bool: True if the submission was removed, False otherwise.
+ """
+ if not self.should_remove(submission):
+ return False
+
+ removal_message = self.get_removal_message(submission)
+
+ if self.config.dry_run:
+ self.logger.info(
+ f"[DRY RUN] Would remove submission {submission.id} by {submission.author}"
+ )
+ self.logger.info(f"[DRY RUN] Would post message: {removal_message}")
+ return True
+
+ try:
+ # Remove the submission
+ submission.mod.remove()
+ self.logger.info(f"Removed submission {submission.id} by {submission.author}")
+
+ # Post removal message
+ submission.reply(removal_message)
+ self.logger.info(f"Posted removal message for {submission.id}")
+
+ return True
+
+ except Exception as e:
+ self.logger.error(f"Error executing rule on {submission.id}: {e}")
+ return False
diff --git a/src/umabot/rules/example_rule.py b/src/umabot/rules/example_rule.py
new file mode 100644
index 0000000..68957b4
--- /dev/null
+++ b/src/umabot/rules/example_rule.py
@@ -0,0 +1,56 @@
+"""Example rule demonstrating how to create custom moderation rules."""
+
+import praw.models
+from .base import Rule
+
+
+class ExampleRule(Rule):
+ """Example rule that demonstrates the modular rule system.
+
+ This rule shows how to create a custom rule that:
+ - Checks for specific keywords in post titles
+ - Removes posts containing banned words
+ - Posts a custom removal message
+ """
+
+ def __init__(self, config):
+ """Initialize the example rule."""
+ super().__init__(config)
+ # Define banned words (customize as needed)
+ self.banned_words = ["spam", "advertisement", "buy now"]
+
+ def should_remove(self, submission: praw.models.Submission) -> bool:
+ """Check if a submission contains banned words in the title."""
+ if not submission.title:
+ return False
+
+ title_lower = submission.title.lower()
+
+ # Check if any banned words are in the title
+ for word in self.banned_words:
+ if word in title_lower:
+ self.logger.info(
+ f"Found banned word '{word}' in submission {submission.id}"
+ )
+ return True
+
+ return False
+
+ def get_removal_message(self, submission: praw.models.Submission) -> str:
+ """Get the removal message for posts with banned words."""
+ return (
+ "Your post has been removed because it contains inappropriate content. "
+ "Please review our community guidelines before posting again."
+ )
+
+
+# To use this rule, add it to the bot in src/umabot/bot.py:
+#
+# from .rules import ExampleRule
+#
+# # In the __init__ method:
+# self.rules = [
+# SpamDetector(config),
+# RoleplayLimiter(config),
+# ExampleRule(config) # Add the example rule
+# ]
diff --git a/src/umabot/rules/roleplay_limiter.py b/src/umabot/rules/roleplay_limiter.py
new file mode 100644
index 0000000..2b88079
--- /dev/null
+++ b/src/umabot/rules/roleplay_limiter.py
@@ -0,0 +1,60 @@
+"""Roleplay post limiter rule."""
+
+from typing import Set
+import praw.models
+from .base import Rule
+
+
+class RoleplayLimiter(Rule):
+ """Limits users to one roleplay post."""
+
+ def __init__(self, config):
+ """Initialize the roleplay limiter."""
+ super().__init__(config)
+ self.roleplay_users: Set[str] = set()
+ self.roleplay_flair = "Roleplay"
+
+ def should_remove(self, submission: praw.models.Submission) -> bool:
+ """Check if a user has already posted a roleplay post."""
+ if not submission.author:
+ return False
+
+ # Check if this is a roleplay post
+ if not self._is_roleplay_post(submission):
+ return False
+
+ username = submission.author.name
+
+ # Check if user has already posted a roleplay post
+ if username in self.roleplay_users:
+ self.logger.info(
+ f"User {username} has already posted a roleplay post"
+ )
+ return True
+
+ # Add user to the set of roleplay posters
+ self.roleplay_users.add(username)
+ return False
+
+ def get_removal_message(self, submission: praw.models.Submission) -> str:
+ """Get the roleplay removal message."""
+ return self.config.roleplay_message
+
+ def _is_roleplay_post(self, submission: praw.models.Submission) -> bool:
+ """Check if a submission has the roleplay flair."""
+ try:
+ # Check link flair text
+ if hasattr(submission, 'link_flair_text') and submission.link_flair_text:
+ return submission.link_flair_text.lower() == self.roleplay_flair.lower()
+
+ # Check flair template ID (if using new flair system)
+ if hasattr(submission, 'link_flair_template_id') and submission.link_flair_template_id:
+ # You might need to map flair template IDs to names
+ # For now, we'll just check the text
+ pass
+
+ return False
+
+ except Exception as e:
+ self.logger.error(f"Error checking flair for submission {submission.id}: {e}")
+ return False
diff --git a/src/umabot/rules/spam_detector.py b/src/umabot/rules/spam_detector.py
new file mode 100644
index 0000000..6f6da34
--- /dev/null
+++ b/src/umabot/rules/spam_detector.py
@@ -0,0 +1,63 @@
+"""Spam detection rule for limiting posts per user per day."""
+
+import time
+from datetime import datetime, timedelta
+from typing import Dict, List
+import praw.models
+from .base import Rule
+
+
+class SpamDetector(Rule):
+ """Detects and removes posts from users who post too frequently."""
+
+ def __init__(self, config):
+ """Initialize the spam detector."""
+ super().__init__(config)
+ self.user_posts: Dict[str, List[float]] = {}
+ self.max_posts = config.max_posts_per_day
+ self.time_window = 24 * 60 * 60 # 24 hours in seconds
+
+ def should_remove(self, submission: praw.models.Submission) -> bool:
+ """Check if a user has posted too frequently."""
+ if not submission.author:
+ return False
+
+ username = submission.author.name
+ current_time = time.time()
+
+ # Clean old posts from tracking
+ self._clean_old_posts(username, current_time)
+
+ # Count current posts in the time window
+ if username not in self.user_posts:
+ self.user_posts[username] = []
+
+ post_count = len(self.user_posts[username])
+
+ # Add current post to tracking
+ self.user_posts[username].append(current_time)
+
+ # Check if this post exceeds the limit
+ if post_count >= self.max_posts:
+ self.logger.info(
+ f"User {username} has posted {post_count + 1} times in 24 hours "
+ f"(limit: {self.max_posts})"
+ )
+ return True
+
+ return False
+
+ def get_removal_message(self, submission: praw.models.Submission) -> str:
+ """Get the spam removal message."""
+ return self.config.spam_message
+
+ def _clean_old_posts(self, username: str, current_time: float) -> None:
+ """Remove posts older than the time window from tracking."""
+ if username not in self.user_posts:
+ return
+
+ cutoff_time = current_time - self.time_window
+ self.user_posts[username] = [
+ post_time for post_time in self.user_posts[username]
+ if post_time > cutoff_time
+ ]