#!/usr/bin/python3.8 # # Check the library for new DVD releases. # import http.client import urllib.parse import xml.etree.ElementTree as ET import email.message import subprocess APL_HOST = 'gateway.bibliocommons.com' PATH_RSS = '/v2/libraries/austin/rss/search' SEEN_FNAME = '/var/lib/homesvr/library-seen.txt' # The command-line to send an email based on STDIN. SENDMAIL_CMD = ('/usr/sbin/sendmail', '-t', '-oi') def check(): # Every DVD that we've seen lately. seen = load_seen() # What will we send email about. new_items = set() conn = http.client.HTTPSConnection(APL_HOST) page = 1 while True: items = get_items(conn, page) n_seen = 0 for item in items: title = item.findtext('title') subtitle = item.findtext('subtitle') if subtitle: title = f'{title} / {subtitle}' link = item.findtext('link') #print('TITLE:', title) #print('LINK:', link) if link not in seen: #print('NEW:', link) seen[link] = title new_items.add(link) else: n_seen += 1 # Sometimes, "seen" DVDs will reappear because a new version comes # out, or whatnot. If we have seen (10) items in this fetch, then # we're likely at the end of new acquisitions. if n_seen > 10: #print('SEEN:', n_seen, '... stopping') break page += 1 # Send email before saving SEEN, so if a failure occurs, we will start # over where we began. if new_items: send_email(new_items, seen) save_seen(seen) def get_items(conn, page): params = { 'custom_edit': 'false', 'query': 'formatcode:(DVD )', 'searchType': 'bl', 'sort': 'newly_acquired', 'suppress': 'true', 'title': 'recent acquisitions', 'view': 'medium', } if page == 1: params['initialSearch'] = 'true' else: params['pagination_page'] = str(page) encode = urllib.parse.urlencode(params, quote_via=urllib.parse.quote) path = f'{PATH_RSS}?{encode}' #print('PATH:', path) conn.request('GET', path) r = conn.getresponse() #print('RESULT:', r.status, r.reason) body = r.read().decode('latin-1') # We have seen some problem characters. Replace them. for c in '\x1b\x19': body = body.replace(c, '*') #print('LEN:', len(body)) tree = ET.fromstring(body) return tree.find('channel').findall('item') def load_seen(): "Return dict of URL:TITLE of seen titles." return dict(l.strip().split(maxsplit=1) for l in open(SEEN_FNAME).readlines()) def save_seen(seen): open(SEEN_FNAME, 'w').write( '\n'.join('%s %s'%i for i in seen.items()) + '\n') def send_email(new_items, seen): text = '\n'.join(f'* {seen[link]} ({link})' for link in new_items) #print('TEXT:\n', text) msg = email.message.EmailMessage() msg.set_content(text) msg['Subject'] = 'New items at the Library' msg['From'] = 'gstein@gmail.com' msg['To'] = 'gstein@gmail.com' subprocess.run(SENDMAIL_CMD, input=msg.as_bytes(), check=True) if __name__ == '__main__': check()