aboutsummaryrefslogtreecommitdiff
path: root/test_moderator.py
diff options
context:
space:
mode:
Diffstat (limited to 'test_moderator.py')
-rwxr-xr-xtest_moderator.py693
1 files changed, 693 insertions, 0 deletions
diff --git a/test_moderator.py b/test_moderator.py
new file mode 100755
index 0000000..50b6715
--- /dev/null
+++ b/test_moderator.py
@@ -0,0 +1,693 @@
+#!/usr/bin/env python3
+"""Test CLI for the intelligent roleplay moderator."""
+
+import argparse
+import json
+import os
+import random
+import sys
+from pathlib import Path
+from typing import Dict, Any, List
+from dataclasses import dataclass
+from openai import OpenAI
+from dotenv import load_dotenv
+import praw
+
+
+@dataclass
+class MockSubmission:
+ """Mock Reddit submission for testing."""
+ id: str
+ title: str
+ selftext: str
+ url: str
+ is_video: bool
+ is_self: bool
+ is_gallery: bool
+ link_flair_text: str
+ author: 'MockAuthor'
+
+ @property
+ def permalink(self) -> str:
+ return f"/r/test/comments/{self.id}/"
+
+
+@dataclass
+class MockAuthor:
+ """Mock Reddit author for testing."""
+ name: str
+
+ def message(self, subject: str, message: str) -> None:
+ print(f"\nšŸ“§ MOD MAIL TO u/{self.name}")
+ print(f"Subject: {subject}")
+ print(f"Message:\n{message}")
+
+
+class RedditDownloader:
+ """Downloads posts from Reddit for testing."""
+
+ def __init__(self, config):
+ """Initialize Reddit client."""
+ self.reddit = praw.Reddit(
+ client_id=config.client_id,
+ client_secret=config.client_secret,
+ username=config.username,
+ password=config.password,
+ user_agent=config.user_agent
+ )
+ self.subreddit_name = config.subreddit_name
+
+ def download_roleplay_posts(self, limit: int = 50, output_dir: Path = None) -> List[Path]:
+ """Download roleplay posts from the subreddit."""
+ if output_dir is None:
+ output_dir = Path("real_test_posts")
+
+ output_dir.mkdir(exist_ok=True)
+
+ subreddit = self.reddit.subreddit(self.subreddit_name)
+ downloaded_files = []
+
+ print(f"Downloading roleplay posts from r/{self.subreddit_name}...")
+
+ count = 0
+ for submission in subreddit.new(limit=limit * 3): # Get more to filter
+ if count >= limit:
+ break
+
+ # Skip removed posts
+ if submission.removed_by_category or submission.selftext == "[removed]" or submission.selftext == "[deleted]":
+ continue
+
+ # Check if it's a roleplay post by flair template ID
+ is_roleplay = False
+ if hasattr(submission, 'link_flair_template_id'):
+ is_roleplay = submission.link_flair_template_id == "311f0024-8302-11f0-9b41-46c005ad843c"
+ elif hasattr(submission, 'link_flair_text'):
+ is_roleplay = submission.link_flair_text and submission.link_flair_text.lower() == "roleplay"
+
+ if is_roleplay:
+ # Create filename
+ safe_title = "".join(c for c in submission.title if c.isalnum() or c in (' ', '-', '_')).rstrip()
+ safe_title = safe_title[:50] # Limit length
+ filename = f"{submission.id}_{safe_title}.txt"
+ filepath = output_dir / filename
+
+ # Write post content
+ content = f"Title: {submission.title}\n\n"
+ if submission.selftext:
+ content += submission.selftext
+ else:
+ content += f"[Link Post: {submission.url}]"
+
+ # Add flair info for debugging
+ if hasattr(submission, 'link_flair_text') and submission.link_flair_text:
+ content += f"\n\nFlair: {submission.link_flair_text}"
+ if hasattr(submission, 'link_flair_template_id') and submission.link_flair_template_id:
+ content += f"\nFlair Template ID: {submission.link_flair_template_id}"
+
+ with open(filepath, 'w', encoding='utf-8') as f:
+ f.write(content)
+
+ downloaded_files.append(filepath)
+ count += 1
+ print(f"Downloaded: {filename}")
+
+ print(f"Downloaded {len(downloaded_files)} roleplay posts")
+ return downloaded_files
+
+ def download_random_posts(self, limit: int = 20, output_dir: Path = None) -> List[Path]:
+ """Download random posts from the subreddit."""
+ if output_dir is None:
+ output_dir = Path("real_test_posts")
+
+ output_dir.mkdir(exist_ok=True)
+
+ subreddit = self.reddit.subreddit(self.subreddit_name)
+ downloaded_files = []
+
+ print(f"Downloading random posts from r/{self.subreddit_name}...")
+
+ count = 0
+ for submission in subreddit.hot(limit=limit * 2): # Get more to filter
+ if count >= limit:
+ break
+
+ # Skip stickied posts
+ if submission.stickied:
+ continue
+
+ # Skip removed posts
+ if submission.removed_by_category or submission.selftext == "[removed]" or submission.selftext == "[deleted]":
+ continue
+
+ # Create filename
+ safe_title = "".join(c for c in submission.title if c.isalnum() or c in (' ', '-', '_')).rstrip()
+ safe_title = safe_title[:50] # Limit length
+ filename = f"{submission.id}_{safe_title}.txt"
+ filepath = output_dir / filename
+
+ # Write post content
+ content = f"Title: {submission.title}\n\n"
+ if submission.selftext:
+ content += submission.selftext
+ else:
+ content += f"[Link Post: {submission.url}]"
+
+ # Add flair info
+ if hasattr(submission, 'link_flair_text') and submission.link_flair_text:
+ content += f"\n\nFlair: {submission.link_flair_text}"
+
+ with open(filepath, 'w', encoding='utf-8') as f:
+ f.write(content)
+
+ downloaded_files.append(filepath)
+ count += 1
+ print(f"Downloaded: {filename}")
+
+ print(f"Downloaded {len(downloaded_files)} random posts")
+ return downloaded_files
+
+
+class TestIntelligentModerator:
+ """Test version of the intelligent roleplay moderator."""
+
+ def __init__(self, openai_api_key: str):
+ """Initialize the test moderator."""
+ self.openai_client = OpenAI(api_key=openai_api_key)
+ self.roleplay_flair = "Roleplay"
+ self.art_flair_template_id = "dd4589ba-7c43-11f0-8ef0-22eefb7f5b84"
+
+ # Evaluation prompt
+ self.evaluation_prompt = """
+You are an expert moderator for a roleplay subreddit. Your job is to evaluate roleplay posts and determine:
+
+1. Whether this post would be better flaired as "Art" instead of "Roleplay"
+2. Whether this is low-effort content that should be removed
+
+For each post, respond with a JSON object containing:
+{{
+ "should_be_art": boolean,
+ "is_low_effort": boolean,
+ "confidence": float (0.0 to 1.0),
+ "reasoning": "Brief explanation of your decision"
+}}
+
+Guidelines:
+- A post should be flaired as "Art" if it's primarily showcasing artwork, images, or visual content with minimal roleplay text
+- A post is "low effort" if it lacks substance, creativity, or meaningful roleplay content
+- Consider factors like: word count, creativity, effort, engagement potential, originality
+- Be strict but fair - err on the side of allowing content unless it's clearly low quality
+- High confidence (0.8+) for clear cases, lower confidence for borderline cases
+
+Post to evaluate:
+Title: {title}
+Content: {content}
+Has Media: {has_media}
+Media Type: {media_type}
+Word Count: {word_count}
+"""
+
+ def test_file(self, file_path: Path, author_name: str = "testuser") -> Dict[str, Any]:
+ """Test a single file against the moderator."""
+ try:
+ # Read file content
+ with open(file_path, 'r', encoding='utf-8') as f:
+ content = f.read().strip()
+
+ # Create mock submission
+ submission = MockSubmission(
+ id=f"test_{file_path.stem}",
+ title=file_path.stem.replace('_', ' ').title(),
+ selftext=content,
+ url="",
+ is_video=False,
+ is_self=True,
+ is_gallery=False,
+ link_flair_text="Roleplay",
+ author=MockAuthor(author_name)
+ )
+
+ # Evaluate the post
+ evaluation = self._evaluate_post(submission)
+
+ # Determine actions (same logic as production moderator)
+ actions = []
+ if evaluation["should_be_art"]:
+ # Change flair to Art and notify user (priority over removal)
+ actions.append("CHANGE_FLAIR_TO_ART")
+ elif evaluation["is_low_effort"]:
+ # Remove low effort post and notify user
+ actions.append("REMOVE_POST")
+ else:
+ # Post is good quality roleplay - allow it
+ actions.append("ALLOW_POST")
+
+ return {
+ "file": str(file_path),
+ "title": submission.title,
+ "content_preview": content[:200] + "..." if len(content) > 200 else content,
+ "word_count": len(content.split()),
+ "has_media": self._has_media(submission),
+ "media_type": self._get_media_type(submission),
+ "evaluation": evaluation,
+ "actions": actions,
+ "success": True
+ }
+
+ except Exception as e:
+ return {
+ "file": str(file_path),
+ "error": str(e),
+ "success": False
+ }
+
+ def test_directory(self, directory_path: Path, author_name: str = "testuser", random_count: int = None) -> List[Dict[str, Any]]:
+ """Test all text files in a directory."""
+ results = []
+
+ # Find all text files
+ text_files = list(directory_path.glob("*.txt"))
+
+ if not text_files:
+ print(f"No .txt files found in {directory_path}")
+ return results
+
+ # Random selection if requested
+ if random_count and random_count < len(text_files):
+ text_files = random.sample(text_files, random_count)
+ print(f"Randomly selected {random_count} files from {len(list(directory_path.glob('*.txt')))} available files")
+
+ print(f"Testing {len(text_files)} text files...")
+
+ for file_path in text_files:
+ print(f"Testing {file_path.name}...")
+ result = self.test_file(file_path, author_name)
+ results.append(result)
+
+ return results
+
+ def _evaluate_post(self, submission: MockSubmission) -> Dict[str, Any]:
+ """Use GPT-5-nano to evaluate the post with retry logic."""
+ response_text = "" # Initialize to avoid scope issues
+
+ max_retries = 3
+ for attempt in range(max_retries):
+ try:
+ return self._evaluate_post_attempt(submission)
+ except Exception as e:
+ if attempt == max_retries - 1:
+ # Last attempt failed, return default evaluation
+ return {
+ "should_be_art": False,
+ "is_low_effort": False,
+ "confidence": 0.0,
+ "reasoning": f"Failed after {max_retries} attempts: {e}"
+ }
+ else:
+ import time
+ time.sleep(1)
+
+ # This should never be reached, but just in case
+ return {
+ "should_be_art": False,
+ "is_low_effort": False,
+ "confidence": 0.0,
+ "reasoning": "Unexpected error in retry logic"
+ }
+
+ def _evaluate_post_attempt(self, submission: MockSubmission) -> Dict[str, Any]:
+ """Single attempt to evaluate the post using GPT-5-nano."""
+ response_text = "" # Initialize to avoid scope issues
+
+ try:
+ # Extract post information
+ title = submission.title or ""
+ content = submission.selftext or ""
+ has_media = self._has_media(submission)
+ media_type = self._get_media_type(submission)
+ word_count = len(content.split()) if content else 0
+
+ # Prepare the prompt
+ prompt = self.evaluation_prompt.format(
+ title=title,
+ content=content,
+ has_media=has_media,
+ media_type=media_type,
+ word_count=word_count
+ )
+
+ # Call GPT-5-nano
+ response = self.openai_client.chat.completions.create(
+ model="gpt-5-nano",
+ messages=[
+ {"role": "system", "content": "You are an expert roleplay moderator. Always respond with valid JSON only. Do not include any text before or after the JSON object."},
+ {"role": "user", "content": prompt}
+ ],
+ max_completion_tokens=2000
+ )
+
+ # Parse the response
+ response_text = response.choices[0].message.content.strip()
+
+ # Clean up the response text to handle common JSON formatting issues
+ response_text = response_text.replace('\n', '').replace('\r', '')
+
+ # Try to extract JSON from the response if it's embedded in other text
+ if '```json' in response_text:
+ # Extract JSON from code blocks
+ start = response_text.find('```json') + 7
+ end = response_text.find('```', start)
+ if end != -1:
+ response_text = response_text[start:end].strip()
+ elif '```' in response_text:
+ # Extract JSON from generic code blocks
+ start = response_text.find('```') + 3
+ end = response_text.find('```', start)
+ if end != -1:
+ response_text = response_text[start:end].strip()
+
+ # Find JSON object boundaries
+ if '{' in response_text and '}' in response_text:
+ start = response_text.find('{')
+ end = response_text.rfind('}') + 1
+ response_text = response_text[start:end]
+
+ try:
+ evaluation = json.loads(response_text)
+ except json.JSONDecodeError as json_err:
+ # Try to fix common JSON issues
+ try:
+ # Fix unterminated strings by adding missing quotes
+ if '"reasoning": "' in response_text and not response_text.rstrip().endswith('"'):
+ response_text = response_text.rstrip() + '"'
+ evaluation = json.loads(response_text)
+ except:
+ raise json_err
+
+ return evaluation
+
+ except json.JSONDecodeError as e:
+ # Return default evaluation on JSON error
+ return {
+ "should_be_art": False,
+ "is_low_effort": False,
+ "confidence": 0.0,
+ "reasoning": f"JSON parsing error: {e}"
+ }
+ except Exception as e:
+ # Return default evaluation on error
+ return {
+ "should_be_art": False,
+ "is_low_effort": False,
+ "confidence": 0.0,
+ "reasoning": f"Error occurred during evaluation: {e}"
+ }
+
+ def _has_media(self, submission: MockSubmission) -> bool:
+ """Check if a submission has media attached."""
+ try:
+ # Check for image/video posts
+ if submission.is_video or submission.is_self:
+ # For self posts, check if they contain media links
+ if submission.is_self and submission.selftext:
+ # Look for common media URLs in the text
+ media_indicators = [
+ 'imgur.com', 'i.imgur.com', 'redd.it', 'i.redd.it',
+ 'youtube.com', 'youtu.be', 'vimeo.com', 'gfycat.com',
+ 'streamable.com', 'twitter.com', 'x.com', 'tiktok.com',
+ 'instagram.com', 'facebook.com', 'discord.com', 'discordapp.com'
+ ]
+
+ text_lower = submission.selftext.lower()
+ return any(indicator in text_lower for indicator in media_indicators)
+ return False
+
+ # Check for link posts with media
+ if hasattr(submission, 'url') and submission.url:
+ # Check if URL points to media
+ url_lower = submission.url.lower()
+ media_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.gifv', '.mp4', '.webm', '.webp']
+ media_domains = [
+ 'imgur.com', 'i.imgur.com', 'redd.it', 'i.redd.it',
+ 'youtube.com', 'youtu.be', 'vimeo.com', 'gfycat.com',
+ 'streamable.com', 'twitter.com', 'x.com', 'tiktok.com',
+ 'instagram.com', 'facebook.com'
+ ]
+
+ # Check file extensions
+ if any(url_lower.endswith(ext) for ext in media_extensions):
+ return True
+
+ # Check media domains
+ if any(domain in url_lower for domain in media_domains):
+ return True
+
+ # Check for gallery posts
+ if hasattr(submission, 'is_gallery') and submission.is_gallery:
+ return True
+
+ return False
+
+ except Exception as e:
+ print(f"Error checking media: {e}")
+ return False
+
+ def _get_media_type(self, submission: MockSubmission) -> str:
+ """Get the type of media in the submission."""
+ try:
+ if submission.is_video:
+ return "video"
+ elif hasattr(submission, 'is_gallery') and submission.is_gallery:
+ return "gallery"
+ elif hasattr(submission, 'url') and submission.url:
+ url_lower = submission.url.lower()
+ if any(ext in url_lower for ext in ['.jpg', '.jpeg', '.png', '.gif', '.webp']):
+ return "image"
+ elif any(ext in url_lower for ext in ['.mp4', '.webm', '.gifv']):
+ return "video"
+ elif any(domain in url_lower for domain in ['youtube.com', 'youtu.be', 'vimeo.com']):
+ return "video"
+ else:
+ return "link"
+ else:
+ return "text"
+
+ except Exception as e:
+ print(f"Error getting media type: {e}")
+ return "unknown"
+
+
+def print_results(results: List[Dict[str, Any]]) -> None:
+ """Print test results in a formatted way."""
+ print("\n" + "="*80)
+ print("TEST RESULTS")
+ print("="*80)
+
+ for i, result in enumerate(results, 1):
+ print(f"\n{i}. {result['file']}")
+ print("-" * 60)
+
+ if not result["success"]:
+ print(f"āŒ ERROR: {result['error']}")
+ continue
+
+ print(f"Title: {result['title']}")
+ print(f"Word Count: {result['word_count']}")
+ print(f"Has Media: {result['has_media']}")
+ print(f"Media Type: {result['media_type']}")
+ print(f"Content Preview: {result['content_preview']}")
+
+ evaluation = result["evaluation"]
+ print(f"\nšŸ¤– AI Evaluation:")
+ print(f" Should be Art: {evaluation['should_be_art']}")
+ print(f" Is Low Effort: {evaluation['is_low_effort']}")
+ print(f" Confidence: {evaluation['confidence']:.2f}")
+ print(f" Reasoning: {evaluation['reasoning']}")
+
+ actions = result["actions"]
+ print(f"\nšŸ“‹ Actions:")
+ for action in actions:
+ if action == "CHANGE_FLAIR_TO_ART":
+ print(" šŸŽØ Change flair to Art")
+ elif action == "REMOVE_POST":
+ print(" šŸ—‘ļø Remove post (low effort)")
+ elif action == "ALLOW_POST":
+ print(" āœ… Allow post")
+
+ # Show what mod mail would be sent
+ if "CHANGE_FLAIR_TO_ART" in actions:
+ print(f"\nšŸ“§ Mod Mail (Art Flair Change):")
+ print(f" Subject: Your post flair has been changed to Art")
+ print(f" Message: Your roleplay post has been automatically re-flaired as 'Art' because it appears to be primarily showcasing artwork or visual content rather than roleplay.")
+ print(f" Reasoning: {evaluation['reasoning']}")
+
+ if "REMOVE_POST" in actions:
+ print(f"\nšŸ“§ Mod Mail (Low Effort Removal):")
+ print(f" Subject: Your roleplay post has been removed for low effort")
+ print(f" Message: Your roleplay post has been removed because it was determined to be low effort content.")
+ print(f" Reasoning: {evaluation['reasoning']}")
+
+
+def main():
+ """Main CLI function."""
+ # Load environment variables from .env file
+ load_dotenv()
+
+ parser = argparse.ArgumentParser(
+ description="Test the intelligent roleplay moderator against text files",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog="""
+Examples:
+ # Test a single file
+ python test_moderator.py --file sample_post.txt
+
+ # Test all files in a directory
+ python test_moderator.py --directory test_posts/
+
+ # Test 5 random files from a directory
+ python test_moderator.py --directory real_test_posts/ --random 5
+
+ # Download 20 roleplay posts from Reddit
+ python test_moderator.py --download-roleplay 20
+
+ # Download 10 random posts from Reddit
+ python test_moderator.py --download-random 10
+
+ # Download posts then test them
+ python test_moderator.py --download-roleplay 15
+ python test_moderator.py --directory real_test_posts/ --random 5
+ """
+ )
+
+ parser.add_argument(
+ "--file", "-f",
+ type=Path,
+ help="Test a single text file"
+ )
+
+ parser.add_argument(
+ "--directory", "-d",
+ type=Path,
+ help="Test all .txt files in a directory"
+ )
+
+ parser.add_argument(
+ "--author", "-a",
+ default="testuser",
+ help="Author name for mock submissions (default: testuser)"
+ )
+
+ parser.add_argument(
+ "--api-key", "-k",
+ help="OpenAI API key (or set OPENAI_API_KEY environment variable)"
+ )
+
+ parser.add_argument(
+ "--verbose", "-v",
+ action="store_true",
+ help="Show detailed output"
+ )
+
+ parser.add_argument(
+ "--random", "-r",
+ type=int,
+ help="Randomly select N files from directory for testing"
+ )
+
+ parser.add_argument(
+ "--download-roleplay",
+ type=int,
+ metavar="COUNT",
+ help="Download N roleplay posts from Reddit for testing"
+ )
+
+ parser.add_argument(
+ "--download-random",
+ type=int,
+ metavar="COUNT",
+ help="Download N random posts from Reddit for testing"
+ )
+
+ args = parser.parse_args()
+
+ # Get API key (from .env file, environment variable, or command line)
+ api_key = args.api_key or os.getenv("OPENAI_API_KEY")
+ if not api_key:
+ print("āŒ Error: OpenAI API key required")
+ print("Set OPENAI_API_KEY in .env file, environment variable, or use --api-key")
+ sys.exit(1)
+
+ # Handle download commands first
+ if args.download_roleplay or args.download_random:
+ # Load Reddit config
+ try:
+ from src.umabot.config import Config
+ config = Config.from_env()
+ config.validate()
+ except Exception as e:
+ print(f"āŒ Error loading Reddit config: {e}")
+ print("Make sure your .env file has all required Reddit credentials")
+ sys.exit(1)
+
+ downloader = RedditDownloader(config)
+
+ if args.download_roleplay:
+ downloader.download_roleplay_posts(args.download_roleplay)
+
+ if args.download_random:
+ downloader.download_random_posts(args.download_random)
+
+ print("āœ… Download complete!")
+ return
+
+ # Validate inputs for testing
+ if not args.file and not args.directory:
+ print("āŒ Error: Must specify either --file or --directory")
+ parser.print_help()
+ sys.exit(1)
+
+ if args.file and not args.file.exists():
+ print(f"āŒ Error: File {args.file} does not exist")
+ sys.exit(1)
+
+ if args.directory and not args.directory.exists():
+ print(f"āŒ Error: Directory {args.directory} does not exist")
+ sys.exit(1)
+
+ # Initialize moderator
+ try:
+ moderator = TestIntelligentModerator(api_key)
+ except Exception as e:
+ print(f"āŒ Error initializing moderator: {e}")
+ sys.exit(1)
+
+ # Run tests
+ results = []
+
+ if args.file:
+ print(f"Testing single file: {args.file}")
+ result = moderator.test_file(args.file, args.author)
+ results.append(result)
+
+ if args.directory:
+ print(f"Testing directory: {args.directory}")
+ dir_results = moderator.test_directory(args.directory, args.author, args.random)
+ results.extend(dir_results)
+
+ # Print results
+ print_results(results)
+
+ # Summary
+ total = len(results)
+ successful = sum(1 for r in results if r["success"])
+ errors = total - successful
+
+ print(f"\n" + "="*80)
+ print(f"SUMMARY: {successful}/{total} tests completed successfully")
+ if errors > 0:
+ print(f"Errors: {errors}")
+ print("="*80)
+
+
+if __name__ == "__main__":
+ main()