#!/usr/bin/env python3 """ Grok Imagine Favorites Scraper ================================ Three-pass scraper for grok.com/imagine/saved Pass 1 — Scrape the favorites list. Extracts image/video cards, downloads preview images, populates the `images` table. Pass 2 — Visit each image detail page (full JS hydration). Collects description/prompt metadata and discovers all sibling video UUIDs into the `videos` table. Pass 3 — Visit each video detail page (static head only). Extracts prompt and share video URL, downloads the video file. Usage: python scrape.py --pass 1 python scrape.py --pass 2 python scrape.py --pass 3 python scrape.py --pass 1 --redo python scrape.py --pass 2 --redo python scrape.py --pass 2 --redo --uuid python scrape.py --pass 3 --redo --uuid """ import argparse import asyncio import json import logging import os import pathlib import re import sqlite3 from datetime import datetime, timezone from pathlib import Path import yaml from easydict import EasyDict as edict from playwright.async_api import async_playwright, Page # ───────────────────────────────────────────── # CONFIGURATION # ───────────────────────────────────────────── THIS_DIR = pathlib.Path(__file__).resolve().parent CFG = edict(yaml.safe_load(open(THIS_DIR / 'config.yaml'))) # ───────────────────────────────────────────── # LOGGING # ───────────────────────────────────────────── _LOG = logging.getLogger('grok_scraper') _LOG.setLevel(logging.INFO) _fmt = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s') _ch = logging.StreamHandler() _ch.setFormatter(_fmt) _LOG.addHandler(_ch) _fh = logging.FileHandler('grok_scraper.log') _fh.setFormatter(_fmt) _LOG.addHandler(_fh) # ───────────────────────────────────────────── # HELPERS # ───────────────────────────────────────────── def now_iso() -> str: return datetime.now(timezone.utc).isoformat() def uuid_from_assets_url(url: str) -> str | None: """ Extract generation UUID from URLs like: assets.grok.com/users/{user}/generated/{uuid}/preview_image.jpg imagine-public.x.ai/imagine-public/images/{uuid}.jpg imagine-public.x.ai/imagine-public/share-videos/{uuid}.mp4 """ m = re.search(r'/generated/([0-9a-f-]{36})/', url) if m: return m.group(1) m = re.search(r'/(?:images|share-videos|share-images)/([0-9a-f-]{36})', url) if m: return m.group(1) return None def clean_url(url: str) -> str: """Strip ?cache=1 and similar cache-busting params.""" return url.split('?')[0] # ───────────────────────────────────────────── # DATABASE # ───────────────────────────────────────────── def init_db(db_path: str) -> sqlite3.Connection: con = sqlite3.connect(db_path) con.row_factory = sqlite3.Row con.executescript(""" CREATE TABLE IF NOT EXISTS images ( uuid TEXT PRIMARY KEY, type TEXT NOT NULL, preview_url TEXT, latest_video_url TEXT, description TEXT, prompt TEXT, pass1_at TEXT, pass2_at TEXT ); CREATE TABLE IF NOT EXISTS videos ( uuid TEXT PRIMARY KEY, image_uuid TEXT NOT NULL, prompt TEXT, description TEXT, share_video_url TEXT, preview_url TEXT, pass3_at TEXT, downloaded_at TEXT, FOREIGN KEY (image_uuid) REFERENCES images(uuid) ); """) con.commit() return con # ───────────────────────────────────────────── # BROWSER # ───────────────────────────────────────────── async def make_context(playwright): browser = await playwright.chromium.launch( channel='chrome', headless=True, args=['--disable-gpu', '--no-sandbox'], ) context = await browser.new_context( viewport={'width': CFG.browser.viewport_width, 'height': CFG.browser.viewport_height}, user_agent=CFG.browser.user_agent, ) if not os.path.exists(CFG.cookies_file): raise FileNotFoundError( f'{CFG.cookies_file} not found — export your Grok cookies first.' ) with open(CFG.cookies_file, encoding='utf-8') as f: cookies = json.load(f) await context.add_cookies(cookies) _LOG.info('Browser context ready with cookies.') return browser, context async def get_meta(page: Page, name: str = None, prop: str = None) -> str | None: """Read a tag by name= or property= attribute.""" if name: loc = page.locator(f'meta[name="{name}"]') elif prop: loc = page.locator(f'meta[property="{prop}"]') else: return None if await loc.count() == 0: return None return await loc.get_attribute('content') class AuthError(Exception): """Raised when cookies are expired or an HTTP-level auth failure occurs.""" class NavigationError(Exception): """Raised when the React app navigates away from the expected URL after hydration.""" async def check_auth(page: Page, expected_url: str, response=None): """ Call immediately after page.goto(). Raises AuthError if the HTTP response landed somewhere unexpected. This catches server-side redirects only — not client-side React navigation. Use assert_url() after hydration to catch the latter. """ if response is None: return actual_http = response.url.rstrip('/') if actual_http == expected_url.rstrip('/'): return _LOG.warning(f' Final HTTP status : {response.status}') _LOG.warning(f' Final HTTP URL : {response.url}') try: req = response.request hops = [] while req is not None: hops.append(req.url) req = req.redirected_from if len(hops) > 1: _LOG.warning(f' Redirect chain ({len(hops)} hops):') for hop in reversed(hops): _LOG.warning(f' {hop}') except Exception: pass try: body = await response.text() snippet = body[:400].replace('\n', ' ').strip() _LOG.warning(f' Response body snippet: {snippet}') except Exception: pass raise AuthError( f'HTTP redirect detected.\n' f' Requested : {expected_url}\n' f' Landed at : {response.url}\n' f' Cookies may be expired or the endpoint requires login.' ) def assert_url(page: Page, expected_url: str): """ Call after page hydration is complete (networkidle or domcontentloaded). Raises NavigationError if the React app navigated away from expected_url. This is a synchronous check — no await needed. """ actual = page.url.rstrip('/') if actual != expected_url.rstrip('/'): raise NavigationError( f'React navigated away from expected page.\n' f' Expected : {expected_url}\n' f' Landed at: {page.url}\n' f' Check that your session cookies are complete (including HttpOnly cookies).' ) async def download_file(context, url: str, dest: Path) -> bool: """Download a URL using the authenticated browser context. Returns True on success.""" if dest.exists(): _LOG.debug(f'Already exists, skipping: {dest.name}') return True try: resp = await context.request.get(url, timeout=60_000) if resp.ok: dest.write_bytes(await resp.body()) _LOG.info(f'Downloaded: {dest.name}') return True elif resp.status == 403: raise AuthError( f'HTTP 403 downloading {url}\n' f' → Cookie may be expired or lacks permission for this asset.\n' f' → Re-export cookies from your browser and retry.' ) else: _LOG.warning(f'Download failed ({resp.status}): {url}') return False except AuthError: raise except Exception as e: _LOG.error(f'Download error for {url}: {e}') return False # ───────────────────────────────────────────── # PASS 1 — FAVORITES LIST # ───────────────────────────────────────────── async def pass1(con: sqlite3.Connection, redo: bool, target_uuid: str | None): Path(CFG.output_dir).mkdir(parents=True, exist_ok=True) async with async_playwright() as pw: browser, context = await make_context(pw) page = await context.new_page() try: _LOG.info(f'Navigating to {CFG.favorites_url}') resp = await page.goto(CFG.favorites_url, wait_until='networkidle', timeout=60_000) await check_auth(page, CFG.favorites_url, resp) assert_url(page, CFG.favorites_url) _LOG.info('Scrolling to load all cards...') for attempt in range(1, CFG.scroll.max_attempts + 1): prev_h = await page.evaluate('document.body.scrollHeight') await page.evaluate('window.scrollTo(0, document.body.scrollHeight)') await page.wait_for_timeout(CFG.scroll.pause_ms) new_h = await page.evaluate('document.body.scrollHeight') if new_h == prev_h: _LOG.info(f'Scroll complete after {attempt} attempts.') break items = page.locator('[role="list"] [role="listitem"]') count = await items.count() _LOG.info(f'Found {count} cards.') for i in range(count): card = items.nth(i) await _process_card_pass1(card, con, context, redo, target_uuid) except NavigationError as e: _LOG.critical(f'NAVIGATION ERROR — aborting Pass 1.\n{e}') except AuthError as e: _LOG.critical(f'AUTHENTICATION FAILURE — aborting Pass 1.\n{e}') finally: await browser.close() _LOG.info('Pass 1 complete.') async def _process_card_pass1(card, con, context, redo, target_uuid): try: # Detect card type has_video = await card.locator('video').count() > 0 if has_video: # Video card — UUID comes from the video src path video_el = card.locator('video').first video_src = await video_el.get_attribute('src') or '' img_el = card.locator('img').first img_src = await img_el.get_attribute('src') or '' uuid = uuid_from_assets_url(video_src) or uuid_from_assets_url(img_src) if not uuid: _LOG.warning(f'Could not extract UUID from video card. src={video_src}') return preview_url = clean_url(img_src) latest_video_url = clean_url(video_src) card_type = 'video' else: # Image card — UUID comes from the img src path img_el = card.locator('img').first img_src = await img_el.get_attribute('src') or '' uuid = uuid_from_assets_url(img_src) if not uuid: # imagine-public.x.ai URL — UUID is the filename stem m = re.search(r'/([0-9a-f-]{36})\.jpg', img_src) uuid = m.group(1) if m else None if not uuid: _LOG.warning(f'Could not extract UUID from image card. src={img_src}') return preview_url = clean_url(img_src) latest_video_url = None card_type = 'image' if target_uuid and uuid != target_uuid: return # Check if already done row = con.execute('SELECT pass1_at FROM images WHERE uuid = ?', (uuid,)).fetchone() if row and row['pass1_at'] and not redo: _LOG.debug(f'Pass 1 already done for {uuid}, skipping.') return _LOG.info(f'Pass 1 processing {card_type} card: {uuid}') # Upsert into images table con.execute(""" INSERT INTO images (uuid, type, preview_url, latest_video_url, pass1_at) VALUES (?, ?, ?, ?, ?) ON CONFLICT(uuid) DO UPDATE SET type = excluded.type, preview_url = excluded.preview_url, latest_video_url = excluded.latest_video_url, pass1_at = excluded.pass1_at """, (uuid, card_type, preview_url, latest_video_url, now_iso())) con.commit() # Download preview image ext = 'jpg' dest = Path(CFG.output_dir) / f'{uuid}_preview.{ext}' await download_file(context, preview_url, dest) except Exception as e: _LOG.error(f'Pass 1 card error: {e}', exc_info=True) # ───────────────────────────────────────────── # PASS 2 — DETAIL PAGES (JS hydration) # ───────────────────────────────────────────── async def pass2(con: sqlite3.Connection, redo: bool, target_uuid: str | None): query = 'SELECT uuid, type FROM images WHERE pass1_at IS NOT NULL' if not redo: query += ' AND pass2_at IS NULL' if target_uuid: query += f" AND uuid = '{target_uuid}'" rows = con.execute(query).fetchall() _LOG.info(f'Pass 2: {len(rows)} images to process.') if not rows: return async with async_playwright() as pw: browser, context = await make_context(pw) sem = asyncio.Semaphore(CFG.concurrency) async def process(row): async with sem: page = await context.new_page() try: await _process_image_pass2(row, page, context, con) finally: await page.close() await asyncio.gather(*[process(r) for r in rows], return_exceptions=True) await browser.close() _LOG.info('Pass 2 complete.') async def _process_image_pass2(row, page: Page, context, con: sqlite3.Connection): uuid = row['uuid'] card_type = row['type'] url = f'https://grok.com/imagine/post/{uuid}' try: _LOG.info(f'Pass 2: {card_type} {uuid}') if card_type == 'image': # Static head is sufficient — no JS needed for description resp = await page.goto(url, wait_until='domcontentloaded', timeout=60_000) await check_auth(page, url, resp) description = await get_meta(page, name='description') con.execute(""" UPDATE images SET description = ?, pass2_at = ? WHERE uuid = ? """, (description, now_iso(), uuid)) con.commit() _LOG.info(f' Image description: {(description or "")[:80]}') else: # Video card — need full hydration to render the sibling strip resp = await page.goto(url, wait_until='networkidle', timeout=90_000) await check_auth(page, url, resp) # Get the prompt for the latest video from the meta description prompt = await get_meta(page, name='description') # Wait for the sibling strip to appear strip_sel = '.snap-y.snap-mandatory' try: await page.wait_for_selector(strip_sel, timeout=15_000) except Exception: _LOG.warning(f' Strip not found for {uuid} — may only have one video.') # Collect all video UUIDs from the strip thumbnails strip = page.locator(strip_sel).first thumbnails = strip.locator('button img') if await page.locator(strip_sel).count() > 0 else None sibling_uuids = [] if thumbnails: thumb_count = await thumbnails.count() for i in range(thumb_count): src = await thumbnails.nth(i).get_attribute('src') or '' vid_uuid = uuid_from_assets_url(src) if vid_uuid: sibling_uuids.append(vid_uuid) if not sibling_uuids: # Fallback: at minimum we know the latest video UUID sibling_uuids = [uuid] _LOG.warning(f' No siblings found via strip, using card UUID only.') _LOG.info(f' Found {len(sibling_uuids)} video(s): {sibling_uuids}') # Upsert all siblings into videos table for vid_uuid in sibling_uuids: # preview_url follows the same pattern for all siblings preview_url = ( f'https://assets.grok.com/users/' # We reconstruct from the thumbnail src pattern we know # Full URL will be confirmed/overwritten in Pass 3 ) # Get preview_url from the strip thumbnail src directly # Re-query the thumbnail src for this specific uuid thumb_src = await page.locator( f'button img[src*="{vid_uuid}"]' ).first.get_attribute('src') or '' preview_url = clean_url(thumb_src) if thumb_src else None con.execute(""" INSERT INTO videos (uuid, image_uuid, preview_url) VALUES (?, ?, ?) ON CONFLICT(uuid) DO UPDATE SET image_uuid = excluded.image_uuid, preview_url = COALESCE(excluded.preview_url, preview_url) """, (vid_uuid, uuid, preview_url)) # Update the images row con.execute(""" UPDATE images SET prompt = ?, pass2_at = ? WHERE uuid = ? """, (prompt, now_iso(), uuid)) con.commit() except AuthError as e: _LOG.critical(f'AUTHENTICATION FAILURE in Pass 2 for {uuid}.\n{e}') raise # re-raise so asyncio.gather surfaces it and we can abort except Exception as e: _LOG.error(f'Pass 2 error for {uuid}: {e}', exc_info=True) # ───────────────────────────────────────────── # PASS 3 — PER-VIDEO PAGES (static head) # ───────────────────────────────────────────── async def pass3(con: sqlite3.Connection, redo: bool, target_uuid: str | None): query = 'SELECT uuid, image_uuid FROM videos WHERE 1=1' if not redo: query += ' AND pass3_at IS NULL' if target_uuid: query += f" AND uuid = '{target_uuid}'" rows = con.execute(query).fetchall() _LOG.info(f'Pass 3: {len(rows)} videos to process.') if not rows: return async with async_playwright() as pw: browser, context = await make_context(pw) sem = asyncio.Semaphore(CFG.concurrency) async def process(row): async with sem: page = await context.new_page() try: await _process_video_pass3(row, page, context, con) finally: await page.close() await asyncio.gather(*[process(r) for r in rows], return_exceptions=True) await browser.close() _LOG.info('Pass 3 complete.') async def _process_video_pass3(row, page: Page, context, con: sqlite3.Connection): uuid = row['uuid'] image_uuid = row['image_uuid'] url = f'https://grok.com/imagine/post/{uuid}' try: _LOG.info(f'Pass 3: video {uuid}') # Static head is sufficient — meta tags are server-rendered resp = await page.goto(url, wait_until='domcontentloaded', timeout=60_000) await check_auth(page, url, resp) prompt = await get_meta(page, name='description') share_video_url = await get_meta(page, prop='og:video') _LOG.info(f' Prompt: {(prompt or "")[:80]}') _LOG.info(f' Video URL: {share_video_url}') # Download the video downloaded_at = None if share_video_url: dest = Path(CFG.output_dir) / f'{uuid}.mp4' ok = await download_file(context, share_video_url, dest) if ok: downloaded_at = now_iso() con.execute(""" UPDATE videos SET prompt = ?, share_video_url = ?, pass3_at = ?, downloaded_at = COALESCE(?, downloaded_at) WHERE uuid = ? """, (prompt, share_video_url, now_iso(), downloaded_at, uuid)) con.commit() except AuthError as e: _LOG.critical(f'AUTHENTICATION FAILURE in Pass 3 for {uuid}.\n{e}') raise except Exception as e: _LOG.error(f'Pass 3 error for {uuid}: {e}', exc_info=True) # ───────────────────────────────────────────── # CHECK AUTH COMMAND # ───────────────────────────────────────────── async def check_auth_cmd(db_path: str): """ Diagnostic command. Verifies cookies are valid by: 1. Navigating to the favorites page and checking for React navigation 2. Attempting to download a known authenticated asset """ print('── Grok auth check ──────────────────────────') async with async_playwright() as pw: browser, context = await make_context(pw) page = await context.new_page() try: # Step 1: page navigation print(f' Navigating to {CFG.favorites_url} ...') resp = await page.goto(CFG.favorites_url, wait_until='networkidle', timeout=60_000) await check_auth(page, CFG.favorites_url, resp) assert_url(page, CFG.favorites_url) print(' ✓ Page navigation OK') # Step 2: authenticated asset download test_url = None # Prefer a real URL from the DB if available if os.path.exists(db_path): try: con = sqlite3.connect(db_path) row = con.execute( "SELECT preview_url FROM images " "WHERE preview_url LIKE '%assets.grok.com%' LIMIT 1" ).fetchone() con.close() if row: test_url = row[0] print(f' Using DB asset URL for CDN test.') except Exception: pass # Fall back to config if not test_url: test_url = CFG.check_auth.get('fallback_test_url') if test_url: print(f' Testing CDN asset fetch ...') resp2 = await context.request.get(test_url, timeout=30_000) if resp2.ok: print(f' ✓ CDN asset fetch OK (HTTP {resp2.status})') elif resp2.status == 403: print(f' ✗ CDN asset fetch FAILED (HTTP 403 — check HttpOnly cookies)') else: print(f' ✗ CDN asset fetch FAILED (HTTP {resp2.status})') else: print(' – CDN asset test skipped (no DB URL and no fallback_test_url in config)') except NavigationError as e: print(f' ✗ React navigation error:\n{e}') except AuthError as e: print(f' ✗ Auth error:\n{e}') except Exception as e: print(f' ✗ Unexpected error: {e}') finally: await browser.close() print('─────────────────────────────────────────────') # ───────────────────────────────────────────── # MAIN # ───────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description='Grok Imagine Favorites Scraper', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) parser.add_argument( '--pass', dest='pass_num', type=int, choices=[1, 2, 3], help='Which pass to run (1, 2, or 3)', ) parser.add_argument( '--check-auth', action='store_true', help='Verify cookies and session are valid, then exit', ) parser.add_argument( '--redo', action='store_true', help='Re-process rows even if already completed', ) parser.add_argument( '--uuid', type=str, default=None, help='Process only this specific UUID', ) parser.add_argument( '--db', type=str, default=None, help=f'Path to SQLite database (overrides config.yaml)', ) parser.add_argument( '--output', type=str, default=None, help=f'Output directory for downloaded files (overrides config.yaml)', ) parser.add_argument( '--concurrency', type=int, default=None, help=f'Max concurrent browser pages for Pass 2/3 (overrides config.yaml)', ) args = parser.parse_args() # Apply CLI overrides to CFG — must happen before any CFG usage below if args.db: CFG.db_file = args.db if args.output: CFG.output_dir = args.output if args.concurrency: CFG.concurrency = args.concurrency if not args.check_auth and args.pass_num is None: parser.error('one of --pass or --check-auth is required') Path(CFG.output_dir).mkdir(parents=True, exist_ok=True) if args.check_auth: asyncio.run(check_auth_cmd(CFG.db_file)) return con = init_db(CFG.db_file) _LOG.info(f'Starting Pass {args.pass_num} ' f'(redo={args.redo}, uuid={args.uuid}, db={CFG.db_file})') if args.pass_num == 1: asyncio.run(pass1(con, args.redo, args.uuid)) elif args.pass_num == 2: asyncio.run(pass2(con, args.redo, args.uuid)) elif args.pass_num == 3: asyncio.run(pass3(con, args.redo, args.uuid)) con.close() if __name__ == '__main__': main()