Spaces:
Runtime error
Runtime error
| """ | |
| FastAPI x402 Demo for Hugging Face Spaces | |
| A pay-per-use AI service with lightweight models | |
| """ | |
| import os | |
| import time | |
| import base64 | |
| import json | |
| import random | |
| import asyncio | |
| from typing import Dict, Any | |
| from dotenv import load_dotenv | |
| from fastapi import FastAPI, Request, HTTPException, APIRouter | |
| from fastapi.responses import HTMLResponse, JSONResponse | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| import torch | |
| from transformers import pipeline | |
| from web3 import Web3 | |
| # New imports for CDP and AWS | |
| import boto3 | |
| from cdp.cdp_client import CdpClient | |
| from events import push | |
| # Load environment variables | |
| load_dotenv() | |
| # Environment validation | |
| REQUIRED_VARS = [ | |
| "PAY_TO_ADDRESS", # Required by fastapi-x402 package | |
| ] | |
| # CDP and AWS variables (only required if using those features) | |
| CDP_VARS = ["CDP_API_KEY", "DEV_WALLET", "HF_WALLET", "AWS_WALLET"] | |
| AWS_VARS = ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_REGION"] | |
| # Check required variables | |
| for var in REQUIRED_VARS: | |
| if not os.getenv(var): | |
| raise ValueError(f"Missing required environment variable: {var}") | |
| # Check CDP variables (warn if missing but don't fail) | |
| CDP_ENABLED = all(os.getenv(var) for var in CDP_VARS) | |
| if not CDP_ENABLED: | |
| print(f"⚠️ CDP revenue splitting disabled. Missing: {[v for v in CDP_VARS if not os.getenv(v)]}") | |
| # Check AWS variables (warn if missing but don't fail) | |
| AWS_ENABLED = all(os.getenv(var) for var in AWS_VARS) | |
| if not AWS_ENABLED: | |
| print(f"⚠️ AWS Bedrock image generation disabled. Missing: {[v for v in AWS_VARS if not os.getenv(v)]}") | |
| # Initialize FastAPI and x402 | |
| from fastapi_x402 import init_x402, pay | |
| app = FastAPI(title="x402 AI Demo", description="Pay-per-use AI services") | |
| # Add CORS middleware for web frontend | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| expose_headers=["*"], | |
| ) | |
| # Initialize x402 with testnet for demo | |
| init_x402(app, network=os.getenv("X402_NETWORK", "base-sepolia")) | |
| # Setup templates and static files | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| templates = Jinja2Templates(directory="templates") | |
| # Load lightweight AI models | |
| print("🤖 Loading AI models...") | |
| # Use DistilGPT-2 - much smaller than GPT-2 but still capable | |
| text_generator = pipeline( | |
| "text-generation", | |
| model="distilgpt2", | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| # Use a simple sentiment analysis model as second service | |
| sentiment_analyzer = pipeline( | |
| "sentiment-analysis", | |
| model="cardiffnlp/twitter-roberta-base-sentiment-latest", | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| print("✅ Models loaded successfully!") | |
| # AWS Bedrock setup | |
| if AWS_ENABLED: | |
| AWS_REGION = os.getenv("AWS_REGION", "us-west-2") | |
| bedrock = boto3.client("bedrock-runtime", region_name=AWS_REGION) | |
| print(f"✅ AWS Bedrock client initialized in {AWS_REGION}") | |
| def nova_canvas(prompt: str, h=512, w=512): | |
| """Generate image using Amazon Nova Canvas.""" | |
| seed = random.randint(0, 858_993_460) | |
| payload = { | |
| "taskType": "TEXT_IMAGE", | |
| "textToImageParams": {"text": prompt}, | |
| "imageGenerationConfig": { | |
| "seed": seed, | |
| "quality": "standard", | |
| "height": h, | |
| "width": w, | |
| "numberOfImages": 1, | |
| }, | |
| } | |
| try: | |
| resp = bedrock.invoke_model( | |
| modelId="amazon.nova-canvas-v1:0", | |
| body=json.dumps(payload), | |
| ) | |
| img_b64 = json.loads(resp["body"].read())["images"][0] | |
| return img_b64 | |
| except Exception as e: | |
| push("error", service="bedrock", error=str(e)) | |
| raise HTTPException(500, f"Bedrock error: {str(e)}") | |
| else: | |
| def nova_canvas(prompt: str, h=512, w=512): | |
| raise HTTPException(503, "AWS Bedrock not configured") | |
| # CDP revenue splitting setup | |
| if CDP_ENABLED: | |
| split_router = APIRouter() | |
| SPLITS = [ | |
| {"wallet": os.environ["DEV_WALLET"], "pct": 30}, | |
| {"wallet": os.environ["HF_WALLET"], "pct": 30}, | |
| {"wallet": os.environ["AWS_WALLET"], "pct": 30}, | |
| ] # 10% stays in collection wallet | |
| print(f"✅ CDP revenue splitting enabled with {len(SPLITS)} recipients") | |
| async def split_revenue(amount_usdc: float): | |
| """Split revenue using CDP transfers""" | |
| try: | |
| async with CdpClient() as cdp: | |
| # Get our collection account | |
| sender = await cdp.evm.get_or_create_account(name="fastx402-demo") | |
| for split in SPLITS: | |
| split_amount = amount_usdc * (split["pct"] / 100) | |
| # Convert to USDC units (6 decimals) | |
| usdc_units = int(split_amount * 1_000_000) | |
| # Transfer to recipient (use same network as x402) | |
| tx_hash = await sender.transfer( | |
| to=split["wallet"], | |
| amount=usdc_units, | |
| token="usdc", | |
| network=os.getenv("X402_NETWORK", "base-sepolia") | |
| ) | |
| push("payout", | |
| to=split["wallet"][:8] + "...", | |
| usdc=split_amount, | |
| pct=split["pct"], | |
| tx_hash=str(tx_hash)[:10]) | |
| print(f"💸 Sent ${split_amount:.4f} USDC to {split['wallet'][:10]}...") | |
| return True | |
| except Exception as e: | |
| push("payout_error", error=str(e), amount=amount_usdc) | |
| print(f"❌ Revenue split error: {e}") | |
| return False | |
| async def trigger_split(amount: float = 0.01): | |
| """Manual endpoint to trigger revenue splitting (for testing)""" | |
| success = await split_revenue(amount) | |
| return {"success": success, "amount": amount, "splits": len(SPLITS)} | |
| # Register CDP router with app | |
| app.include_router(split_router) | |
| # Request models | |
| class TextRequest(BaseModel): | |
| prompt: str | |
| class SentimentRequest(BaseModel): | |
| text: str | |
| # Routes | |
| async def read_root(request: Request): | |
| """Serve the main demo page""" | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return {"status": "healthy", "models_loaded": True} | |
| # 1 cent for text generation | |
| async def generate_text(request: TextRequest): | |
| """Generate text using DistilGPT-2""" | |
| try: | |
| print(f"💭 Generating text for: '{request.prompt}'") | |
| push("paid_inference", service="text-gen", usdc=0.01, prompt=request.prompt[:50]) | |
| result = text_generator( | |
| request.prompt, | |
| max_new_tokens=50, | |
| num_return_sequences=1, | |
| temperature=0.8, | |
| do_sample=True, | |
| pad_token_id=text_generator.tokenizer.eos_token_id | |
| )[0]['generated_text'] | |
| push("inference_success", service="text-gen", length=len(result)) | |
| # Trigger revenue split for successful payment | |
| if CDP_ENABLED: | |
| asyncio.create_task(split_revenue(0.01)) | |
| return {"result": result, "model": "distilgpt2"} | |
| except Exception as e: | |
| print(f"❌ Text generation error: {e}") | |
| push("inference_error", service="text-gen", error=str(e)) | |
| return JSONResponse(status_code=500, content={"error": str(e)}) | |
| # Half cent for sentiment analysis | |
| async def analyze_sentiment(request: SentimentRequest): | |
| """Analyze sentiment using RoBERTa""" | |
| try: | |
| print(f"😊 Analyzing sentiment for: '{request.text}'") | |
| push("paid_inference", service="sentiment", usdc=0.005, text=request.text[:50]) | |
| result = sentiment_analyzer(request.text)[0] | |
| push("inference_success", service="sentiment", sentiment=result["label"], confidence=result["score"]) | |
| # Trigger revenue split for successful payment | |
| if CDP_ENABLED: | |
| asyncio.create_task(split_revenue(0.005)) | |
| return { | |
| "text": request.text, | |
| "sentiment": result["label"], | |
| "confidence": round(result["score"], 3), | |
| "model": "twitter-roberta-base-sentiment" | |
| } | |
| except Exception as e: | |
| print(f"❌ Sentiment analysis error: {e}") | |
| push("inference_error", service="sentiment", error=str(e)) | |
| return JSONResponse(status_code=500, content={"error": str(e)}) | |
| # Add image generation request model | |
| class ImageRequest(BaseModel): | |
| prompt: str | |
| # 6 cents for image generation | |
| async def generate_image(request: ImageRequest): | |
| """Generate image using Amazon Nova Canvas""" | |
| try: | |
| print(f"🖼️ Generating image for: '{request.prompt}'") | |
| push("paid_inference", service="image-gen", usdc=0.06, prompt=request.prompt[:50]) | |
| img_b64 = nova_canvas(request.prompt) | |
| push("inference_success", service="image-gen", size="512x512") | |
| # Trigger revenue split for successful payment | |
| if CDP_ENABLED: | |
| asyncio.create_task(split_revenue(0.02)) | |
| return { | |
| "image": img_b64, | |
| "model": "nova-canvas-v1", | |
| "prompt": request.prompt | |
| } | |
| except Exception as e: | |
| print(f"❌ Image generation error: {e}") | |
| push("inference_error", service="image-gen", error=str(e)) | |
| return JSONResponse(status_code=500, content={"error": str(e)}) | |
| async def debug_info(): | |
| """Debug endpoint to check x402 configuration""" | |
| from fastapi_x402.core import get_config, get_facilitator_client | |
| config = get_config() | |
| facilitator = get_facilitator_client() | |
| return { | |
| "network": config.network, | |
| "facilitator_url": facilitator.base_url, | |
| "is_coinbase_cdp": facilitator.is_coinbase_cdp, | |
| "device": "cuda" if torch.cuda.is_available() else "cpu", | |
| "models": ["distilgpt2", "twitter-roberta-base-sentiment"], | |
| "features": { | |
| "cdp_enabled": CDP_ENABLED, | |
| "aws_enabled": AWS_ENABLED | |
| } | |
| } | |
| # Live dashboard and events API | |
| async def get_events(): | |
| """Get real-time events for dashboard (JSON feed for JS)""" | |
| from events import dump | |
| return JSONResponse(dump()) | |
| async def get_splits(): | |
| """Get revenue split configuration""" | |
| if not CDP_ENABLED: | |
| return {"error": "CDP not enabled"} | |
| return { | |
| "splits": SPLITS, | |
| "collection_account": "fastx402-demo", | |
| "reserve_percentage": 10 | |
| } | |
| async def dashboard(refresh_rate: int = 2000): | |
| """Live revenue split dashboard""" | |
| return f""" | |
| <!doctype html> | |
| <html> | |
| <head> | |
| <meta charset='utf-8'> | |
| <title>x402 Live Revenue Dashboard</title> | |
| <style> | |
| body {{ font-family: sans-serif; margin: 40px; background: #f8fafc; }} | |
| .container {{ max-width: 1200px; margin: 0 auto; }} | |
| .header {{ background: white; padding: 20px; border-radius: 8px; margin-bottom: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }} | |
| .stats {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 20px; margin-bottom: 20px; }} | |
| .stat-card {{ background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }} | |
| .stat-value {{ font-size: 2em; font-weight: bold; color: #059669; }} | |
| .stat-label {{ color: #6b7280; font-size: 0.9em; }} | |
| table {{ width: 100%; background: white; border-radius: 8px; overflow: hidden; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }} | |
| th, td {{ padding: 12px; text-align: left; border-bottom: 1px solid #e5e7eb; }} | |
| th {{ background: #f9fafb; font-weight: 600; }} | |
| .event-paid {{ background: #ecfdf5; }} | |
| .event-deposit {{ background: #fef3c7; }} | |
| .event-payout {{ background: #e0f2fe; }} | |
| .event-error {{ background: #fef2f2; }} | |
| .refresh-info {{ color: #6b7280; font-size: 0.8em; margin-top: 10px; }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <div class="header"> | |
| <h1>🚀 x402 Live Revenue Dashboard</h1> | |
| <p>Real-time payment and revenue splitting activity</p> | |
| <div class="refresh-info">Auto-refreshes every {refresh_rate/1000}s</div> | |
| </div> | |
| <div class="stats" id="stats"> | |
| <div class="stat-card"> | |
| <div class="stat-value" id="total-payments">0</div> | |
| <div class="stat-label">Total Payments</div> | |
| </div> | |
| <div class="stat-card"> | |
| <div class="stat-value" id="total-revenue">$0.00</div> | |
| <div class="stat-label">Total Revenue</div> | |
| </div> | |
| <div class="stat-card"> | |
| <div class="stat-value" id="total-payouts">0</div> | |
| <div class="stat-label">Payouts Made</div> | |
| </div> | |
| </div> | |
| <table> | |
| <thead> | |
| <tr> | |
| <th>Time</th> | |
| <th>Event</th> | |
| <th>Details</th> | |
| </tr> | |
| </thead> | |
| <tbody id="events-log"></tbody> | |
| </table> | |
| </div> | |
| <script> | |
| let totalPayments = 0; | |
| let totalRevenue = 0; | |
| let totalPayouts = 0; | |
| async function updateDashboard() {{ | |
| try {{ | |
| const response = await fetch('/events'); | |
| const events = await response.json(); | |
| // Reset counters | |
| totalPayments = 0; | |
| totalRevenue = 0; | |
| totalPayouts = 0; | |
| // Update event log | |
| const tbody = document.getElementById('events-log'); | |
| tbody.innerHTML = ''; | |
| events.forEach(event => {{ | |
| const row = document.createElement('tr'); | |
| let className = ''; | |
| let details = ''; | |
| // Calculate stats and format details | |
| switch(event.kind) {{ | |
| case 'paid_inference': | |
| totalPayments++; | |
| totalRevenue += event.usdc || 0; | |
| className = 'event-paid'; | |
| details = `Service: ${{event.service}}, Amount: $${{(event.usdc || 0).toFixed(3)}}`; | |
| break; | |
| case 'deposit': | |
| className = 'event-deposit'; | |
| details = `TX: ${{event.tx}}, Amount: $${{(event.usdc || 0).toFixed(3)}}`; | |
| break; | |
| case 'payout': | |
| totalPayouts++; | |
| className = 'event-payout'; | |
| details = `To: ${{event.to}}, Amount: $${{(event.usdc || 0).toFixed(3)}} (${{event.pct}}%)`; | |
| break; | |
| case 'inference_error': | |
| case 'error': | |
| case 'webhook_error': | |
| className = 'event-error'; | |
| details = `Error: ${{event.error}}`; | |
| break; | |
| default: | |
| details = JSON.stringify(event); | |
| }} | |
| row.className = className; | |
| row.innerHTML = ` | |
| <td>${{event.t}}</td> | |
| <td>${{event.kind}}</td> | |
| <td>${{details}}</td> | |
| `; | |
| tbody.appendChild(row); | |
| }}); | |
| // Update stats | |
| document.getElementById('total-payments').textContent = totalPayments; | |
| document.getElementById('total-revenue').textContent = `$${{totalRevenue.toFixed(3)}}`; | |
| document.getElementById('total-payouts').textContent = totalPayouts; | |
| }} catch (error) {{ | |
| console.error('Dashboard update failed:', error); | |
| }} | |
| }} | |
| // Initial load and periodic updates | |
| updateDashboard(); | |
| setInterval(updateDashboard, {refresh_rate}); | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| if __name__ == "__main__": | |
| import uvicorn | |
| # Disable uvloop to avoid async context issues, use standard asyncio instead | |
| uvicorn.run(app, host="0.0.0.0", port=7860, loop="asyncio") | |