From 6650dcfee1c8aefedf7c9330566c0bb7ecb1a1d3 Mon Sep 17 00:00:00 2001 From: Fuwn Date: Wed, 27 Aug 2025 17:49:43 -0700 Subject: feat: Initial commit --- src/umabot/__init__.py | 13 ++++ src/umabot/__main__.py | 6 ++ src/umabot/bot.py | 136 +++++++++++++++++++++++++++++++++++ src/umabot/cli.py | 107 +++++++++++++++++++++++++++ src/umabot/config.py | 72 +++++++++++++++++++ src/umabot/rules/__init__.py | 7 ++ src/umabot/rules/base.py | 58 +++++++++++++++ src/umabot/rules/example_rule.py | 56 +++++++++++++++ src/umabot/rules/roleplay_limiter.py | 60 ++++++++++++++++ src/umabot/rules/spam_detector.py | 63 ++++++++++++++++ 10 files changed, 578 insertions(+) create mode 100644 src/umabot/__init__.py create mode 100644 src/umabot/__main__.py create mode 100644 src/umabot/bot.py create mode 100644 src/umabot/cli.py create mode 100644 src/umabot/config.py create mode 100644 src/umabot/rules/__init__.py create mode 100644 src/umabot/rules/base.py create mode 100644 src/umabot/rules/example_rule.py create mode 100644 src/umabot/rules/roleplay_limiter.py create mode 100644 src/umabot/rules/spam_detector.py (limited to 'src') diff --git a/src/umabot/__init__.py b/src/umabot/__init__.py new file mode 100644 index 0000000..1b6b859 --- /dev/null +++ b/src/umabot/__init__.py @@ -0,0 +1,13 @@ +"""UmaBot - A modular Reddit bot for post moderation.""" + +from .bot import UmaBot +from .config import Config + +__version__ = "0.1.0" +__all__ = ["UmaBot", "Config"] + + +def main(): + """Main entry point for the bot.""" + from .cli import run_bot + run_bot() diff --git a/src/umabot/__main__.py b/src/umabot/__main__.py new file mode 100644 index 0000000..61daa90 --- /dev/null +++ b/src/umabot/__main__.py @@ -0,0 +1,6 @@ +"""Main module entry point for UmaBot.""" + +from .cli import run_bot + +if __name__ == "__main__": + run_bot() diff --git a/src/umabot/bot.py b/src/umabot/bot.py new file mode 100644 index 0000000..3db29e9 --- /dev/null +++ b/src/umabot/bot.py @@ -0,0 +1,136 @@ +"""Main bot class for UmaBot.""" + +import time +import praw +from typing import List +from loguru import logger + +from .config import Config +from .rules import SpamDetector, RoleplayLimiter + + +class UmaBot: + """Main Reddit bot class.""" + + def __init__(self, config: Config): + """Initialize the bot with configuration.""" + self.config = config + self.logger = logger.bind(bot="UmaBot") + + # Initialize Reddit client + self.reddit = praw.Reddit( + client_id=config.client_id, + client_secret=config.client_secret, + username=config.username, + password=config.password, + user_agent=config.user_agent + ) + + # Get subreddit + self.subreddit = self.reddit.subreddit(config.subreddit_name) + + # Initialize rules + self.rules = [ + SpamDetector(config), + RoleplayLimiter(config) + ] + + # Track processed submissions to avoid processing old posts + self.processed_submissions = set() + self.initialized = False + + self.logger.info(f"Bot initialized for r/{config.subreddit_name}") + + def run(self): + """Run the bot continuously.""" + self.logger.info("Starting UmaBot...") + + try: + while True: + self._check_new_posts() + time.sleep(self.config.check_interval) + + except KeyboardInterrupt: + self.logger.info("Bot stopped by user") + except Exception as e: + self.logger.error(f"Bot crashed: {e}") + raise + + def _check_new_posts(self): + """Check for new posts and apply rules.""" + try: + # Get new submissions + new_submissions = list(self.subreddit.new(limit=25)) + + if not new_submissions: + self.logger.debug("No new submissions found") + return + + # On first run, just populate the processed set without processing + if not self.initialized: + self.logger.info("Initializing bot - marking existing posts as processed") + for submission in new_submissions: + self.processed_submissions.add(submission.id) + self.initialized = True + self.logger.info(f"Bot initialized with {len(self.processed_submissions)} existing posts marked as processed") + return + + # Filter out already processed submissions + truly_new_submissions = [] + for submission in new_submissions: + if submission.id not in self.processed_submissions: + truly_new_submissions.append(submission) + self.processed_submissions.add(submission.id) + + if not truly_new_submissions: + self.logger.debug("No truly new submissions found") + return + + self.logger.info(f"Processing {len(truly_new_submissions)} new submissions") + + # Apply rules to each new submission + for submission in truly_new_submissions: + self._apply_rules(submission) + + # Clean up old processed submissions periodically + self._cleanup_processed_submissions() + + except Exception as e: + self.logger.error(f"Error checking new posts: {e}") + + def _apply_rules(self, submission): + """Apply all rules to a submission.""" + try: + # Apply each rule + for rule in self.rules: + if rule.execute(submission): + # If a rule removes the submission, stop applying other rules + break + + except Exception as e: + self.logger.error(f"Error applying rules to {submission.id}: {e}") + + def add_rule(self, rule): + """Add a new rule to the bot.""" + self.rules.append(rule) + self.logger.info(f"Added new rule: {rule.__class__.__name__}") + + def test_connection(self): + """Test the Reddit API connection.""" + try: + # Try to access the subreddit + subreddit_name = self.subreddit.display_name + self.logger.info(f"Successfully connected to r/{subreddit_name}") + return True + except Exception as e: + self.logger.error(f"Failed to connect to Reddit: {e}") + return False + + def _cleanup_processed_submissions(self): + """Clean up old processed submission IDs to prevent memory bloat.""" + # Keep only the last 1000 processed submissions + if len(self.processed_submissions) > 1000: + # Convert to list, keep last 1000, convert back to set + submissions_list = list(self.processed_submissions) + self.processed_submissions = set(submissions_list[-1000:]) + self.logger.debug(f"Cleaned up processed submissions, keeping {len(self.processed_submissions)} most recent") diff --git a/src/umabot/cli.py b/src/umabot/cli.py new file mode 100644 index 0000000..29999ec --- /dev/null +++ b/src/umabot/cli.py @@ -0,0 +1,107 @@ +"""Command-line interface for UmaBot.""" + +import sys +import argparse +from loguru import logger + +from .config import Config +from .bot import UmaBot + + +def setup_logging(verbose: bool = False): + """Setup logging configuration.""" + log_level = "DEBUG" if verbose else "INFO" + + logger.remove() # Remove default handler + logger.add( + sys.stderr, + level=log_level, + format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}" + ) + + # Also log to file + logger.add( + "umabot.log", + rotation="1 day", + retention="7 days", + level=log_level, + format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}" + ) + + +def run_bot(): + """Main entry point for the bot.""" + parser = argparse.ArgumentParser( + description="UmaBot - A modular Reddit bot for post moderation" + ) + + parser.add_argument( + "--verbose", "-v", + action="store_true", + help="Enable verbose logging" + ) + + parser.add_argument( + "--dry-run", + action="store_true", + help="Run in dry-run mode (don't actually remove posts)" + ) + + parser.add_argument( + "--test", + action="store_true", + help="Test Reddit connection and exit" + ) + + args = parser.parse_args() + + # Setup logging + setup_logging(args.verbose) + + try: + # Load configuration + config = Config.from_env() + + # Override dry-run if specified + if args.dry_run: + config.dry_run = True + + # Validate configuration + config.validate() + + # Create bot + bot = UmaBot(config) + + # Test connection if requested + if args.test: + if bot.test_connection(): + logger.info("Connection test successful!") + return 0 + else: + logger.error("Connection test failed!") + return 1 + + # Run the bot + bot.run() + + except ValueError as e: + logger.error(f"Configuration error: {e}") + logger.info("Please check your environment variables:") + logger.info(" REDDIT_CLIENT_ID") + logger.info(" REDDIT_CLIENT_SECRET") + logger.info(" REDDIT_USERNAME") + logger.info(" REDDIT_PASSWORD") + logger.info(" SUBREDDIT_NAME") + return 1 + + except KeyboardInterrupt: + logger.info("Bot stopped by user") + return 0 + + except Exception as e: + logger.error(f"Unexpected error: {e}") + return 1 + + +if __name__ == "__main__": + sys.exit(run_bot()) diff --git a/src/umabot/config.py b/src/umabot/config.py new file mode 100644 index 0000000..93ebf54 --- /dev/null +++ b/src/umabot/config.py @@ -0,0 +1,72 @@ +"""Configuration management for UmaBot.""" + +import os +from dataclasses import dataclass +from typing import Optional +from dotenv import load_dotenv + +load_dotenv() + + +@dataclass +class Config: + """Configuration for the Reddit bot.""" + + # Reddit API credentials + client_id: str + client_secret: str + username: str + password: str + user_agent: str + + # Subreddit configuration + subreddit_name: str + + # Bot messages + spam_message: str + roleplay_message: str + + # Bot settings + check_interval: int = 60 # seconds + max_posts_per_day: int = 3 + dry_run: bool = False + + @classmethod + def from_env(cls) -> "Config": + """Create configuration from environment variables.""" + return cls( + client_id=os.getenv("REDDIT_CLIENT_ID", ""), + client_secret=os.getenv("REDDIT_CLIENT_SECRET", ""), + username=os.getenv("REDDIT_USERNAME", ""), + password=os.getenv("REDDIT_PASSWORD", ""), + user_agent=os.getenv("REDDIT_USER_AGENT", "UmaBot/0.1.0"), + subreddit_name=os.getenv("SUBREDDIT_NAME", ""), + spam_message=os.getenv( + "SPAM_MESSAGE", + "Your post has been removed for posting too frequently. Please wait before posting again." + ), + roleplay_message=os.getenv( + "ROLEPLAY_MESSAGE", + "Your post has been removed. Only one roleplay post is allowed per user." + ), + check_interval=int(os.getenv("CHECK_INTERVAL", "60")), + max_posts_per_day=int(os.getenv("MAX_POSTS_PER_DAY", "3")), + dry_run=os.getenv("DRY_RUN", "false").lower() == "true", + ) + + def validate(self) -> None: + """Validate that all required configuration is present.""" + required_fields = [ + "client_id", "client_secret", "username", + "password", "subreddit_name" + ] + + missing_fields = [] + for field in required_fields: + if not getattr(self, field): + missing_fields.append(field) + + if missing_fields: + raise ValueError( + f"Missing required configuration: {', '.join(missing_fields)}" + ) diff --git a/src/umabot/rules/__init__.py b/src/umabot/rules/__init__.py new file mode 100644 index 0000000..e912e70 --- /dev/null +++ b/src/umabot/rules/__init__.py @@ -0,0 +1,7 @@ +"""Moderation rules for UmaBot.""" + +from .base import Rule +from .spam_detector import SpamDetector +from .roleplay_limiter import RoleplayLimiter + +__all__ = ["Rule", "SpamDetector", "RoleplayLimiter"] diff --git a/src/umabot/rules/base.py b/src/umabot/rules/base.py new file mode 100644 index 0000000..0318aaa --- /dev/null +++ b/src/umabot/rules/base.py @@ -0,0 +1,58 @@ +"""Base rule class for moderation rules.""" + +from abc import ABC, abstractmethod +from typing import List, Optional +import praw.models +from loguru import logger + + +class Rule(ABC): + """Base class for all moderation rules.""" + + def __init__(self, config): + """Initialize the rule with configuration.""" + self.config = config + self.logger = logger.bind(rule=self.__class__.__name__) + + @abstractmethod + def should_remove(self, submission: praw.models.Submission) -> bool: + """Determine if a submission should be removed.""" + pass + + @abstractmethod + def get_removal_message(self, submission: praw.models.Submission) -> str: + """Get the message to post when removing a submission.""" + pass + + def execute(self, submission: praw.models.Submission) -> bool: + """Execute the rule on a submission. + + Returns: + bool: True if the submission was removed, False otherwise. + """ + if not self.should_remove(submission): + return False + + removal_message = self.get_removal_message(submission) + + if self.config.dry_run: + self.logger.info( + f"[DRY RUN] Would remove submission {submission.id} by {submission.author}" + ) + self.logger.info(f"[DRY RUN] Would post message: {removal_message}") + return True + + try: + # Remove the submission + submission.mod.remove() + self.logger.info(f"Removed submission {submission.id} by {submission.author}") + + # Post removal message + submission.reply(removal_message) + self.logger.info(f"Posted removal message for {submission.id}") + + return True + + except Exception as e: + self.logger.error(f"Error executing rule on {submission.id}: {e}") + return False diff --git a/src/umabot/rules/example_rule.py b/src/umabot/rules/example_rule.py new file mode 100644 index 0000000..68957b4 --- /dev/null +++ b/src/umabot/rules/example_rule.py @@ -0,0 +1,56 @@ +"""Example rule demonstrating how to create custom moderation rules.""" + +import praw.models +from .base import Rule + + +class ExampleRule(Rule): + """Example rule that demonstrates the modular rule system. + + This rule shows how to create a custom rule that: + - Checks for specific keywords in post titles + - Removes posts containing banned words + - Posts a custom removal message + """ + + def __init__(self, config): + """Initialize the example rule.""" + super().__init__(config) + # Define banned words (customize as needed) + self.banned_words = ["spam", "advertisement", "buy now"] + + def should_remove(self, submission: praw.models.Submission) -> bool: + """Check if a submission contains banned words in the title.""" + if not submission.title: + return False + + title_lower = submission.title.lower() + + # Check if any banned words are in the title + for word in self.banned_words: + if word in title_lower: + self.logger.info( + f"Found banned word '{word}' in submission {submission.id}" + ) + return True + + return False + + def get_removal_message(self, submission: praw.models.Submission) -> str: + """Get the removal message for posts with banned words.""" + return ( + "Your post has been removed because it contains inappropriate content. " + "Please review our community guidelines before posting again." + ) + + +# To use this rule, add it to the bot in src/umabot/bot.py: +# +# from .rules import ExampleRule +# +# # In the __init__ method: +# self.rules = [ +# SpamDetector(config), +# RoleplayLimiter(config), +# ExampleRule(config) # Add the example rule +# ] diff --git a/src/umabot/rules/roleplay_limiter.py b/src/umabot/rules/roleplay_limiter.py new file mode 100644 index 0000000..2b88079 --- /dev/null +++ b/src/umabot/rules/roleplay_limiter.py @@ -0,0 +1,60 @@ +"""Roleplay post limiter rule.""" + +from typing import Set +import praw.models +from .base import Rule + + +class RoleplayLimiter(Rule): + """Limits users to one roleplay post.""" + + def __init__(self, config): + """Initialize the roleplay limiter.""" + super().__init__(config) + self.roleplay_users: Set[str] = set() + self.roleplay_flair = "Roleplay" + + def should_remove(self, submission: praw.models.Submission) -> bool: + """Check if a user has already posted a roleplay post.""" + if not submission.author: + return False + + # Check if this is a roleplay post + if not self._is_roleplay_post(submission): + return False + + username = submission.author.name + + # Check if user has already posted a roleplay post + if username in self.roleplay_users: + self.logger.info( + f"User {username} has already posted a roleplay post" + ) + return True + + # Add user to the set of roleplay posters + self.roleplay_users.add(username) + return False + + def get_removal_message(self, submission: praw.models.Submission) -> str: + """Get the roleplay removal message.""" + return self.config.roleplay_message + + def _is_roleplay_post(self, submission: praw.models.Submission) -> bool: + """Check if a submission has the roleplay flair.""" + try: + # Check link flair text + if hasattr(submission, 'link_flair_text') and submission.link_flair_text: + return submission.link_flair_text.lower() == self.roleplay_flair.lower() + + # Check flair template ID (if using new flair system) + if hasattr(submission, 'link_flair_template_id') and submission.link_flair_template_id: + # You might need to map flair template IDs to names + # For now, we'll just check the text + pass + + return False + + except Exception as e: + self.logger.error(f"Error checking flair for submission {submission.id}: {e}") + return False diff --git a/src/umabot/rules/spam_detector.py b/src/umabot/rules/spam_detector.py new file mode 100644 index 0000000..6f6da34 --- /dev/null +++ b/src/umabot/rules/spam_detector.py @@ -0,0 +1,63 @@ +"""Spam detection rule for limiting posts per user per day.""" + +import time +from datetime import datetime, timedelta +from typing import Dict, List +import praw.models +from .base import Rule + + +class SpamDetector(Rule): + """Detects and removes posts from users who post too frequently.""" + + def __init__(self, config): + """Initialize the spam detector.""" + super().__init__(config) + self.user_posts: Dict[str, List[float]] = {} + self.max_posts = config.max_posts_per_day + self.time_window = 24 * 60 * 60 # 24 hours in seconds + + def should_remove(self, submission: praw.models.Submission) -> bool: + """Check if a user has posted too frequently.""" + if not submission.author: + return False + + username = submission.author.name + current_time = time.time() + + # Clean old posts from tracking + self._clean_old_posts(username, current_time) + + # Count current posts in the time window + if username not in self.user_posts: + self.user_posts[username] = [] + + post_count = len(self.user_posts[username]) + + # Add current post to tracking + self.user_posts[username].append(current_time) + + # Check if this post exceeds the limit + if post_count >= self.max_posts: + self.logger.info( + f"User {username} has posted {post_count + 1} times in 24 hours " + f"(limit: {self.max_posts})" + ) + return True + + return False + + def get_removal_message(self, submission: praw.models.Submission) -> str: + """Get the spam removal message.""" + return self.config.spam_message + + def _clean_old_posts(self, username: str, current_time: float) -> None: + """Remove posts older than the time window from tracking.""" + if username not in self.user_posts: + return + + cutoff_time = current_time - self.time_window + self.user_posts[username] = [ + post_time for post_time in self.user_posts[username] + if post_time > cutoff_time + ] -- cgit v1.2.3