diff options
Diffstat (limited to 'test_moderator.py')
| -rwxr-xr-x | test_moderator.py | 693 |
1 files changed, 693 insertions, 0 deletions
diff --git a/test_moderator.py b/test_moderator.py new file mode 100755 index 0000000..50b6715 --- /dev/null +++ b/test_moderator.py @@ -0,0 +1,693 @@ +#!/usr/bin/env python3 +"""Test CLI for the intelligent roleplay moderator.""" + +import argparse +import json +import os +import random +import sys +from pathlib import Path +from typing import Dict, Any, List +from dataclasses import dataclass +from openai import OpenAI +from dotenv import load_dotenv +import praw + + +@dataclass +class MockSubmission: + """Mock Reddit submission for testing.""" + id: str + title: str + selftext: str + url: str + is_video: bool + is_self: bool + is_gallery: bool + link_flair_text: str + author: 'MockAuthor' + + @property + def permalink(self) -> str: + return f"/r/test/comments/{self.id}/" + + +@dataclass +class MockAuthor: + """Mock Reddit author for testing.""" + name: str + + def message(self, subject: str, message: str) -> None: + print(f"\nš§ MOD MAIL TO u/{self.name}") + print(f"Subject: {subject}") + print(f"Message:\n{message}") + + +class RedditDownloader: + """Downloads posts from Reddit for testing.""" + + def __init__(self, config): + """Initialize Reddit client.""" + self.reddit = praw.Reddit( + client_id=config.client_id, + client_secret=config.client_secret, + username=config.username, + password=config.password, + user_agent=config.user_agent + ) + self.subreddit_name = config.subreddit_name + + def download_roleplay_posts(self, limit: int = 50, output_dir: Path = None) -> List[Path]: + """Download roleplay posts from the subreddit.""" + if output_dir is None: + output_dir = Path("real_test_posts") + + output_dir.mkdir(exist_ok=True) + + subreddit = self.reddit.subreddit(self.subreddit_name) + downloaded_files = [] + + print(f"Downloading roleplay posts from r/{self.subreddit_name}...") + + count = 0 + for submission in subreddit.new(limit=limit * 3): # Get more to filter + if count >= limit: + break + + # Skip removed posts + if submission.removed_by_category or submission.selftext == "[removed]" or submission.selftext == "[deleted]": + continue + + # Check if it's a roleplay post by flair template ID + is_roleplay = False + if hasattr(submission, 'link_flair_template_id'): + is_roleplay = submission.link_flair_template_id == "311f0024-8302-11f0-9b41-46c005ad843c" + elif hasattr(submission, 'link_flair_text'): + is_roleplay = submission.link_flair_text and submission.link_flair_text.lower() == "roleplay" + + if is_roleplay: + # Create filename + safe_title = "".join(c for c in submission.title if c.isalnum() or c in (' ', '-', '_')).rstrip() + safe_title = safe_title[:50] # Limit length + filename = f"{submission.id}_{safe_title}.txt" + filepath = output_dir / filename + + # Write post content + content = f"Title: {submission.title}\n\n" + if submission.selftext: + content += submission.selftext + else: + content += f"[Link Post: {submission.url}]" + + # Add flair info for debugging + if hasattr(submission, 'link_flair_text') and submission.link_flair_text: + content += f"\n\nFlair: {submission.link_flair_text}" + if hasattr(submission, 'link_flair_template_id') and submission.link_flair_template_id: + content += f"\nFlair Template ID: {submission.link_flair_template_id}" + + with open(filepath, 'w', encoding='utf-8') as f: + f.write(content) + + downloaded_files.append(filepath) + count += 1 + print(f"Downloaded: {filename}") + + print(f"Downloaded {len(downloaded_files)} roleplay posts") + return downloaded_files + + def download_random_posts(self, limit: int = 20, output_dir: Path = None) -> List[Path]: + """Download random posts from the subreddit.""" + if output_dir is None: + output_dir = Path("real_test_posts") + + output_dir.mkdir(exist_ok=True) + + subreddit = self.reddit.subreddit(self.subreddit_name) + downloaded_files = [] + + print(f"Downloading random posts from r/{self.subreddit_name}...") + + count = 0 + for submission in subreddit.hot(limit=limit * 2): # Get more to filter + if count >= limit: + break + + # Skip stickied posts + if submission.stickied: + continue + + # Skip removed posts + if submission.removed_by_category or submission.selftext == "[removed]" or submission.selftext == "[deleted]": + continue + + # Create filename + safe_title = "".join(c for c in submission.title if c.isalnum() or c in (' ', '-', '_')).rstrip() + safe_title = safe_title[:50] # Limit length + filename = f"{submission.id}_{safe_title}.txt" + filepath = output_dir / filename + + # Write post content + content = f"Title: {submission.title}\n\n" + if submission.selftext: + content += submission.selftext + else: + content += f"[Link Post: {submission.url}]" + + # Add flair info + if hasattr(submission, 'link_flair_text') and submission.link_flair_text: + content += f"\n\nFlair: {submission.link_flair_text}" + + with open(filepath, 'w', encoding='utf-8') as f: + f.write(content) + + downloaded_files.append(filepath) + count += 1 + print(f"Downloaded: {filename}") + + print(f"Downloaded {len(downloaded_files)} random posts") + return downloaded_files + + +class TestIntelligentModerator: + """Test version of the intelligent roleplay moderator.""" + + def __init__(self, openai_api_key: str): + """Initialize the test moderator.""" + self.openai_client = OpenAI(api_key=openai_api_key) + self.roleplay_flair = "Roleplay" + self.art_flair_template_id = "dd4589ba-7c43-11f0-8ef0-22eefb7f5b84" + + # Evaluation prompt + self.evaluation_prompt = """ +You are an expert moderator for a roleplay subreddit. Your job is to evaluate roleplay posts and determine: + +1. Whether this post would be better flaired as "Art" instead of "Roleplay" +2. Whether this is low-effort content that should be removed + +For each post, respond with a JSON object containing: +{{ + "should_be_art": boolean, + "is_low_effort": boolean, + "confidence": float (0.0 to 1.0), + "reasoning": "Brief explanation of your decision" +}} + +Guidelines: +- A post should be flaired as "Art" if it's primarily showcasing artwork, images, or visual content with minimal roleplay text +- A post is "low effort" if it lacks substance, creativity, or meaningful roleplay content +- Consider factors like: word count, creativity, effort, engagement potential, originality +- Be strict but fair - err on the side of allowing content unless it's clearly low quality +- High confidence (0.8+) for clear cases, lower confidence for borderline cases + +Post to evaluate: +Title: {title} +Content: {content} +Has Media: {has_media} +Media Type: {media_type} +Word Count: {word_count} +""" + + def test_file(self, file_path: Path, author_name: str = "testuser") -> Dict[str, Any]: + """Test a single file against the moderator.""" + try: + # Read file content + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read().strip() + + # Create mock submission + submission = MockSubmission( + id=f"test_{file_path.stem}", + title=file_path.stem.replace('_', ' ').title(), + selftext=content, + url="", + is_video=False, + is_self=True, + is_gallery=False, + link_flair_text="Roleplay", + author=MockAuthor(author_name) + ) + + # Evaluate the post + evaluation = self._evaluate_post(submission) + + # Determine actions (same logic as production moderator) + actions = [] + if evaluation["should_be_art"]: + # Change flair to Art and notify user (priority over removal) + actions.append("CHANGE_FLAIR_TO_ART") + elif evaluation["is_low_effort"]: + # Remove low effort post and notify user + actions.append("REMOVE_POST") + else: + # Post is good quality roleplay - allow it + actions.append("ALLOW_POST") + + return { + "file": str(file_path), + "title": submission.title, + "content_preview": content[:200] + "..." if len(content) > 200 else content, + "word_count": len(content.split()), + "has_media": self._has_media(submission), + "media_type": self._get_media_type(submission), + "evaluation": evaluation, + "actions": actions, + "success": True + } + + except Exception as e: + return { + "file": str(file_path), + "error": str(e), + "success": False + } + + def test_directory(self, directory_path: Path, author_name: str = "testuser", random_count: int = None) -> List[Dict[str, Any]]: + """Test all text files in a directory.""" + results = [] + + # Find all text files + text_files = list(directory_path.glob("*.txt")) + + if not text_files: + print(f"No .txt files found in {directory_path}") + return results + + # Random selection if requested + if random_count and random_count < len(text_files): + text_files = random.sample(text_files, random_count) + print(f"Randomly selected {random_count} files from {len(list(directory_path.glob('*.txt')))} available files") + + print(f"Testing {len(text_files)} text files...") + + for file_path in text_files: + print(f"Testing {file_path.name}...") + result = self.test_file(file_path, author_name) + results.append(result) + + return results + + def _evaluate_post(self, submission: MockSubmission) -> Dict[str, Any]: + """Use GPT-5-nano to evaluate the post with retry logic.""" + response_text = "" # Initialize to avoid scope issues + + max_retries = 3 + for attempt in range(max_retries): + try: + return self._evaluate_post_attempt(submission) + except Exception as e: + if attempt == max_retries - 1: + # Last attempt failed, return default evaluation + return { + "should_be_art": False, + "is_low_effort": False, + "confidence": 0.0, + "reasoning": f"Failed after {max_retries} attempts: {e}" + } + else: + import time + time.sleep(1) + + # This should never be reached, but just in case + return { + "should_be_art": False, + "is_low_effort": False, + "confidence": 0.0, + "reasoning": "Unexpected error in retry logic" + } + + def _evaluate_post_attempt(self, submission: MockSubmission) -> Dict[str, Any]: + """Single attempt to evaluate the post using GPT-5-nano.""" + response_text = "" # Initialize to avoid scope issues + + try: + # Extract post information + title = submission.title or "" + content = submission.selftext or "" + has_media = self._has_media(submission) + media_type = self._get_media_type(submission) + word_count = len(content.split()) if content else 0 + + # Prepare the prompt + prompt = self.evaluation_prompt.format( + title=title, + content=content, + has_media=has_media, + media_type=media_type, + word_count=word_count + ) + + # Call GPT-5-nano + response = self.openai_client.chat.completions.create( + model="gpt-5-nano", + messages=[ + {"role": "system", "content": "You are an expert roleplay moderator. Always respond with valid JSON only. Do not include any text before or after the JSON object."}, + {"role": "user", "content": prompt} + ], + max_completion_tokens=2000 + ) + + # Parse the response + response_text = response.choices[0].message.content.strip() + + # Clean up the response text to handle common JSON formatting issues + response_text = response_text.replace('\n', '').replace('\r', '') + + # Try to extract JSON from the response if it's embedded in other text + if '```json' in response_text: + # Extract JSON from code blocks + start = response_text.find('```json') + 7 + end = response_text.find('```', start) + if end != -1: + response_text = response_text[start:end].strip() + elif '```' in response_text: + # Extract JSON from generic code blocks + start = response_text.find('```') + 3 + end = response_text.find('```', start) + if end != -1: + response_text = response_text[start:end].strip() + + # Find JSON object boundaries + if '{' in response_text and '}' in response_text: + start = response_text.find('{') + end = response_text.rfind('}') + 1 + response_text = response_text[start:end] + + try: + evaluation = json.loads(response_text) + except json.JSONDecodeError as json_err: + # Try to fix common JSON issues + try: + # Fix unterminated strings by adding missing quotes + if '"reasoning": "' in response_text and not response_text.rstrip().endswith('"'): + response_text = response_text.rstrip() + '"' + evaluation = json.loads(response_text) + except: + raise json_err + + return evaluation + + except json.JSONDecodeError as e: + # Return default evaluation on JSON error + return { + "should_be_art": False, + "is_low_effort": False, + "confidence": 0.0, + "reasoning": f"JSON parsing error: {e}" + } + except Exception as e: + # Return default evaluation on error + return { + "should_be_art": False, + "is_low_effort": False, + "confidence": 0.0, + "reasoning": f"Error occurred during evaluation: {e}" + } + + def _has_media(self, submission: MockSubmission) -> bool: + """Check if a submission has media attached.""" + try: + # Check for image/video posts + if submission.is_video or submission.is_self: + # For self posts, check if they contain media links + if submission.is_self and submission.selftext: + # Look for common media URLs in the text + media_indicators = [ + 'imgur.com', 'i.imgur.com', 'redd.it', 'i.redd.it', + 'youtube.com', 'youtu.be', 'vimeo.com', 'gfycat.com', + 'streamable.com', 'twitter.com', 'x.com', 'tiktok.com', + 'instagram.com', 'facebook.com', 'discord.com', 'discordapp.com' + ] + + text_lower = submission.selftext.lower() + return any(indicator in text_lower for indicator in media_indicators) + return False + + # Check for link posts with media + if hasattr(submission, 'url') and submission.url: + # Check if URL points to media + url_lower = submission.url.lower() + media_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.gifv', '.mp4', '.webm', '.webp'] + media_domains = [ + 'imgur.com', 'i.imgur.com', 'redd.it', 'i.redd.it', + 'youtube.com', 'youtu.be', 'vimeo.com', 'gfycat.com', + 'streamable.com', 'twitter.com', 'x.com', 'tiktok.com', + 'instagram.com', 'facebook.com' + ] + + # Check file extensions + if any(url_lower.endswith(ext) for ext in media_extensions): + return True + + # Check media domains + if any(domain in url_lower for domain in media_domains): + return True + + # Check for gallery posts + if hasattr(submission, 'is_gallery') and submission.is_gallery: + return True + + return False + + except Exception as e: + print(f"Error checking media: {e}") + return False + + def _get_media_type(self, submission: MockSubmission) -> str: + """Get the type of media in the submission.""" + try: + if submission.is_video: + return "video" + elif hasattr(submission, 'is_gallery') and submission.is_gallery: + return "gallery" + elif hasattr(submission, 'url') and submission.url: + url_lower = submission.url.lower() + if any(ext in url_lower for ext in ['.jpg', '.jpeg', '.png', '.gif', '.webp']): + return "image" + elif any(ext in url_lower for ext in ['.mp4', '.webm', '.gifv']): + return "video" + elif any(domain in url_lower for domain in ['youtube.com', 'youtu.be', 'vimeo.com']): + return "video" + else: + return "link" + else: + return "text" + + except Exception as e: + print(f"Error getting media type: {e}") + return "unknown" + + +def print_results(results: List[Dict[str, Any]]) -> None: + """Print test results in a formatted way.""" + print("\n" + "="*80) + print("TEST RESULTS") + print("="*80) + + for i, result in enumerate(results, 1): + print(f"\n{i}. {result['file']}") + print("-" * 60) + + if not result["success"]: + print(f"ā ERROR: {result['error']}") + continue + + print(f"Title: {result['title']}") + print(f"Word Count: {result['word_count']}") + print(f"Has Media: {result['has_media']}") + print(f"Media Type: {result['media_type']}") + print(f"Content Preview: {result['content_preview']}") + + evaluation = result["evaluation"] + print(f"\nš¤ AI Evaluation:") + print(f" Should be Art: {evaluation['should_be_art']}") + print(f" Is Low Effort: {evaluation['is_low_effort']}") + print(f" Confidence: {evaluation['confidence']:.2f}") + print(f" Reasoning: {evaluation['reasoning']}") + + actions = result["actions"] + print(f"\nš Actions:") + for action in actions: + if action == "CHANGE_FLAIR_TO_ART": + print(" šØ Change flair to Art") + elif action == "REMOVE_POST": + print(" šļø Remove post (low effort)") + elif action == "ALLOW_POST": + print(" ā
Allow post") + + # Show what mod mail would be sent + if "CHANGE_FLAIR_TO_ART" in actions: + print(f"\nš§ Mod Mail (Art Flair Change):") + print(f" Subject: Your post flair has been changed to Art") + print(f" Message: Your roleplay post has been automatically re-flaired as 'Art' because it appears to be primarily showcasing artwork or visual content rather than roleplay.") + print(f" Reasoning: {evaluation['reasoning']}") + + if "REMOVE_POST" in actions: + print(f"\nš§ Mod Mail (Low Effort Removal):") + print(f" Subject: Your roleplay post has been removed for low effort") + print(f" Message: Your roleplay post has been removed because it was determined to be low effort content.") + print(f" Reasoning: {evaluation['reasoning']}") + + +def main(): + """Main CLI function.""" + # Load environment variables from .env file + load_dotenv() + + parser = argparse.ArgumentParser( + description="Test the intelligent roleplay moderator against text files", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Test a single file + python test_moderator.py --file sample_post.txt + + # Test all files in a directory + python test_moderator.py --directory test_posts/ + + # Test 5 random files from a directory + python test_moderator.py --directory real_test_posts/ --random 5 + + # Download 20 roleplay posts from Reddit + python test_moderator.py --download-roleplay 20 + + # Download 10 random posts from Reddit + python test_moderator.py --download-random 10 + + # Download posts then test them + python test_moderator.py --download-roleplay 15 + python test_moderator.py --directory real_test_posts/ --random 5 + """ + ) + + parser.add_argument( + "--file", "-f", + type=Path, + help="Test a single text file" + ) + + parser.add_argument( + "--directory", "-d", + type=Path, + help="Test all .txt files in a directory" + ) + + parser.add_argument( + "--author", "-a", + default="testuser", + help="Author name for mock submissions (default: testuser)" + ) + + parser.add_argument( + "--api-key", "-k", + help="OpenAI API key (or set OPENAI_API_KEY environment variable)" + ) + + parser.add_argument( + "--verbose", "-v", + action="store_true", + help="Show detailed output" + ) + + parser.add_argument( + "--random", "-r", + type=int, + help="Randomly select N files from directory for testing" + ) + + parser.add_argument( + "--download-roleplay", + type=int, + metavar="COUNT", + help="Download N roleplay posts from Reddit for testing" + ) + + parser.add_argument( + "--download-random", + type=int, + metavar="COUNT", + help="Download N random posts from Reddit for testing" + ) + + args = parser.parse_args() + + # Get API key (from .env file, environment variable, or command line) + api_key = args.api_key or os.getenv("OPENAI_API_KEY") + if not api_key: + print("ā Error: OpenAI API key required") + print("Set OPENAI_API_KEY in .env file, environment variable, or use --api-key") + sys.exit(1) + + # Handle download commands first + if args.download_roleplay or args.download_random: + # Load Reddit config + try: + from src.umabot.config import Config + config = Config.from_env() + config.validate() + except Exception as e: + print(f"ā Error loading Reddit config: {e}") + print("Make sure your .env file has all required Reddit credentials") + sys.exit(1) + + downloader = RedditDownloader(config) + + if args.download_roleplay: + downloader.download_roleplay_posts(args.download_roleplay) + + if args.download_random: + downloader.download_random_posts(args.download_random) + + print("ā
Download complete!") + return + + # Validate inputs for testing + if not args.file and not args.directory: + print("ā Error: Must specify either --file or --directory") + parser.print_help() + sys.exit(1) + + if args.file and not args.file.exists(): + print(f"ā Error: File {args.file} does not exist") + sys.exit(1) + + if args.directory and not args.directory.exists(): + print(f"ā Error: Directory {args.directory} does not exist") + sys.exit(1) + + # Initialize moderator + try: + moderator = TestIntelligentModerator(api_key) + except Exception as e: + print(f"ā Error initializing moderator: {e}") + sys.exit(1) + + # Run tests + results = [] + + if args.file: + print(f"Testing single file: {args.file}") + result = moderator.test_file(args.file, args.author) + results.append(result) + + if args.directory: + print(f"Testing directory: {args.directory}") + dir_results = moderator.test_directory(args.directory, args.author, args.random) + results.extend(dir_results) + + # Print results + print_results(results) + + # Summary + total = len(results) + successful = sum(1 for r in results if r["success"]) + errors = total - successful + + print(f"\n" + "="*80) + print(f"SUMMARY: {successful}/{total} tests completed successfully") + if errors > 0: + print(f"Errors: {errors}") + print("="*80) + + +if __name__ == "__main__": + main() |