#!/usr/bin/python3 # # Check the library for new DVD releases. # # SAMPLE: # https://gateway.bibliocommons.com/v2/libraries/austin/rss/search?query=formatcode%3A%28DVD%20%29&searchType=bl&custom_edit=false&sort=newly_acquired&suppress=true&title=recent%20acquisitions&view=grouped # # https://gateway.bibliocommons.com/v2/libraries/austin/rss/search?query=formatcode%3A%28DVD%20%29&searchType=bl&custom_edit=false&sort=newly_acquired&suppress=true&origin=user-saved-search&title=recent%20acquisitions&view=grouped&_gl=1*18l1qem*_ga*MzAxNDY2NTA4LjE3MTgxMTkyMDI.*_ga_G99DMMNG39*MTcxODMzNzc4My4zLjEuMTcxODMzNzgzNi4wLjAuMA.. # /v2/libraries/austin/rss/search?custom_edit=false&query=formatcode%3A%28DVD%20%29&searchType=bl&sort=newly_acquired&suppress=true&origin=user-saved-search&title=recent%20acquisitions&view=grouped&_gl=1%2A18l1qem%2A_ga%2AMzAxNDY2NTA4LjE3MTgxMTkyMDI.%2A_ga_G99DMMNG39%2AMTcxODMzNzc4My4zLjEuMTcxODMzNzgzNi4wLjAuMA.. # import urllib.parse import xml.etree.ElementTree as ET import email.message import subprocess import requests # Austin Public Library APL_HOST = 'austin.bibliocommons.com' APL_HOST = 'gateway.bibliocommons.com' PATH_RSS = '/v2/search' PATH_RSS = '/v2/libraries/austin/rss/search' QUERY_URL = f'https://{APL_HOST}{PATH_RSS}' # https://gateway.bibliocommons.com/v2/libraries/austin/rss/search?query=formatcode%3A%28DVD%20%29&searchType=bl&custom_edit=false&suppress=true&sort=newly_acquired&origin=user-saved-search&title=tst&view=small&_gl=1*1rnhavp*_ga*MTY5NjI2NTczMC4xNjUyNjQzMTgw*_ga_G99DMMNG39*MTcxNDI2MjM1OC4xODAuMS4xNzE0MjYyNzYxLjAuMC4w # https://gateway.bibliocommons.com/v2/libraries/austin/rss/search?query=formatcode%3A%28DVD%20%29&searchType=bl&custom_edit=false&suppress=true&sort=newly_acquired&origin=user-saved-search&title=tst&view=small SEEN_FNAME = '/var/lib/homesvr/library-seen.txt' # The command-line to send an email based on STDIN. SENDMAIL_CMD = ('/usr/sbin/sendmail', '-t', '-oi') def check(): # Every DVD that we've seen lately. seen = load_seen() # What will we send email about. new_items = set() page = 1 while True: items = get_items(page) n_seen = 0 for item in items: title = item.findtext('title') subtitle = item.findtext('subtitle') if subtitle: title = f'{title} / {subtitle}' link = item.findtext('link') #print('TITLE:', title) #print('LINK:', link) if link not in seen: #print('NEW:', link) seen[link] = title new_items.add(link) else: n_seen += 1 # Sometimes, "seen" DVDs will reappear because a new version comes # out, or whatnot. If we have seen (10) items in this fetch, then # we're likely at the end of new acquisitions. if n_seen > 10: #print('SEEN:', n_seen, '... stopping') break page += 1 # Send email before saving SEEN, so if a failure occurs, we will start # over where we began. if new_items: send_email(new_items, seen) save_seen(seen) def get_items(page): params = { 'custom_edit': 'false', 'query': 'formatcode:(DVD )', 'searchType': 'bl', 'custom_edit': 'false', 'sort': 'newly_acquired', 'suppress': 'true', 'origin': 'user-saved-search', 'title': 'recent acquisitions', #'view': 'small', 'view': 'grouped', 'origin': 'user-saved-search', #'_gl': '1*1rnhavp*_ga*MTY5NjI2NTczMC4xNjUyNjQzMTgw*_ga_G99DMMNG39*MTcxNDI2MjM1OC4xODAuMS4xNzE0MjYyNzYxLjAuMC4w', } if page == 1: pass#params['initialSearch'] = 'true' else: params['pagination_page'] = str(page) headers = { # 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', # 'Accept-Encoding': 'gzip, deflate, br, zstd', # 'Accept-Language': 'en-US,en;q=0.9', # 'Cookie': 'NERF_SRV=nerf07; SRV=app04; _ga=GA1.1.1568735397.1718337851; _ga_967LXNJZ75=GS1.1.1718337851.1.1.1718337990.0.0.0; _ga_G99DMMNG39=GS1.1.1718337852.1.1.1718337990.0.0.0; branch=%7B%22ip%22%3A%2270.114.241.77%22%2C%22austin%22%3Anull%7D', # 'Priority': 'u=0, i', # 'User-Agent': 'Mozilla/5.0 (X11; CrOS x86_64 14541.0.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36', } if False: import http.client conn = http.client.HTTPSConnection(APL_HOST) encode = urllib.parse.urlencode(params, quote_via=urllib.parse.quote) path = f'{PATH_RSS}?{encode}' print('PATH:', path) conn.request('GET', path, headers=headers) r = conn.getresponse() print('RESULT:', r.status, r.reason) body = r.read().decode('latin-1') else: ### should pass this across get_items() callsx s = requests.Session() req = requests.Request('GET', QUERY_URL, headers=headers, data=params) pr = s.prepare_request(req) pr.prepare_url(QUERY_URL, params) pr.prepare_headers(headers) print('URL:', pr.url) r = s.send(pr) print('RESULT:', r.status_code, r.reason) body = r.text # We have seen some problem characters. Replace them. for c in '\x1b\x19': body = body.replace(c, '*') print('LEN:', len(body)) print(f'BODY: "{body[:40]}"') tree = ET.fromstring(body) return tree.find('channel').findall('item') def load_seen(): "Return dict of URL:TITLE of seen titles." return dict(l.strip().split(maxsplit=1) for l in open(SEEN_FNAME).readlines()) def save_seen(seen): open(SEEN_FNAME, 'w').write( '\n'.join('%s %s'%i for i in seen.items()) + '\n') def send_email(new_items, seen): text = '\n'.join(f'* {seen[link]} ({link})' for link in new_items) #print('TEXT:\n', text) msg = email.message.EmailMessage() msg.set_content(text) msg['Subject'] = 'New items at the Library' msg['From'] = 'gstein@gmail.com' msg['To'] = 'gstein@gmail.com' subprocess.run(SENDMAIL_CMD, input=msg.as_bytes(), check=True) if __name__ == '__main__': #check() if True: print('ITEMS:', get_items(1))