Source code for berhoel.imdb_extract

"""Extract series information from the IMDB web page."""

from __future__ import annotations

import argparse
from contextlib import suppress
import csv
from importlib import metadata
from pathlib import Path
import re

from rich.console import Console
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.firefox_profile import FirefoxProfile
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.support.ui import WebDriverWait

from .unicodewriter import UnicodeWriter

__date__ = "2024/08/03 22:11:35 hoel"
__author__ = "Berthold Höllmann"
__copyright__ = "Copyright © 2011,2013,2023 by Berthold Höllmann"
__credits__ = ["Berthold Höllmann"]
__maintainer__ = "Berthold Höllmann"
__email__ = "berhoel@gmail.com"


EPISODE_HEADER = re.compile(r"Season (?P<season>\d+), Episode (?P<episode>\d+):")
CONSOLE = Console()


[docs] class IMDBEntry: """Handle IMDB episodes information for TV series.""" line = 2 TITLE_MATCH = re.compile(r"S(?P<season>\d+)\.E(?P<episode>\d+) ∙ (?P<title>.+)")
[docs] def __init__(self, season: int, href_text: str, href: str, desc: str): """Initialize class instance.""" self.season = season _match = self.TITLE_MATCH.match(href_text) if _match is None: msg = "Season info not found." raise ValueError(msg) if int(_match.group("season")) != self.season: msg = "Current season does not fit html data." raise ValueError(msg) self.episode = int(_match.group("episode")) self.title = _match.group("title") self.url = href self.url = self.url[: self.url.rfind("?")] self.descr = desc
[docs] def list(self) -> tuple: """Return information list.""" IMDBEntry.line += 1 return ( None, self.season, self.episode, None, None, None, f'=HYPERLINK("{self.url}";"{self.title}")', self.descr, f"=WENN(ODER(ISTLEER(F{IMDBEntry.line});" f'ISTLEER(A{IMDBEntry.line}));"";' f"SVERWEIS(F{IMDBEntry.line};F$3:J$10000;5;0)/" f"SUMMENPRODUKT(((A$3:A$10000)>0)*((F$3:F$10000)=" f"F{IMDBEntry.line})))", )
[docs] def __lt__(self, other: object) -> bool: """Return if instance less than or equal other.""" if not isinstance(other, IMDBEntry): raise TypeError return self.episode < other.episode
[docs] def __le__(self, other: object) -> bool: """Return if instance less than other.""" if not isinstance(other, IMDBEntry): raise TypeError return self.episode <= other.episode
[docs] def __eq__(self, other: object) -> bool: """Return if instance is equal other.""" if not isinstance(other, IMDBEntry): raise TypeError return self.episode == other.episode
[docs] def __ne__(self, other: object) -> bool: """Return if instance is not equal other.""" if not isinstance(other, IMDBEntry): raise TypeError return self.episode != other.episode
[docs] def __gt__(self, other: object) -> bool: """Return if instance is greater than other.""" if not isinstance(other, IMDBEntry): raise TypeError return self.episode > other.episode
[docs] def __ge__(self, other: object) -> bool: """Return if instance is greater or equal than other.""" if not isinstance(other, IMDBEntry): raise TypeError return self.episode >= other.episode
[docs] class IMDBInfo: """Process html page from IMDB."""
[docs] def __init__(self, args: argparse.Namespace): """Initialize instance.""" self.url = self.get_url(args.url[0]) options = Options() options.add_argument("--headless") firefox_profile = FirefoxProfile() firefox_profile.set_preference(key="intl.accept_languages", value=args.language) firefox_profile.set_preference(key="javascript.enabled", value=True) options.profile = firefox_profile self.driver = webdriver.Firefox(options=options) self.series = " ".join(args.title) if len(self.series) == 0: self.series = self.get_series() self.driver.get(self.url)
[docs] def __del__(self) -> None: """Cleuup.""" self.driver.quit()
[docs] def get_url(self, url: Path | str) -> str: """Return preprocessed URL.""" path = Path(url) res = url if path.is_file(): with path.open() as csvfile: inforeader = csv.reader(csvfile) res = next(inforeader)[0].split('"')[1] if not isinstance(res, str): raise TypeError while res.endswith("/"): res = res[:-1] if not res.endswith("/episodes"): res += "/episodes" return res
[docs] def __call__(self) -> None: """Process data.""" self.process_data()
[docs] def process_data(self) -> None: """Generate the csv file.""" wait = WebDriverWait(self.driver, 10) def extract_url(inp: str | None) -> str: if inp is None: raise ValueError return inp.split("=")[-1] seasons = [ int(extract_url(i.get_attribute("href"))) for i in wait.until( ec.element_to_be_clickable( (By.XPATH, '//div[contains(@class,"ipc-tabs--display-chip")]') ) ).find_elements( By.XPATH, '//div[contains(@class,"ipc-tabs--display-chip")]//a' ) ] tbl = "".maketrans("/", "_") with Path(f"{self.series.strip().translate(tbl)}.csv").open("w") as filep: writer = UnicodeWriter(filep, delimiter=";", quoting=csv.QUOTE_MINIMAL) writer.writerow( [f'=HYPERLINK("{self.url.strip()[:-9]}";"{self.series.strip()}")'] ) writer.writerow( [ '=HYPERLINK("#Übersicht";"Datum")', None, None, "Disk", "Index", "Diskset", "IMDB URL", None, "=MITTELWERT(I3:I10000)", "=SUMME(J3:J10000)", ] ) for season in seasons: self.driver.get(f"{self.url}/?season={season}") wait = WebDriverWait(self.driver, 10) CONSOLE.print(f"Season {season}") elem = wait.until( ec.element_to_be_clickable( ( By.XPATH, '//article[contains(@class,"episode-item-wrapper")]', ) ) ) articles = elem.find_elements( By.XPATH, '//article[contains(@class,"episode-item-wrapper")]', ) episodes = [] for article in articles: description_ = article.find_elements( By.XPATH, './/div[@class="ipc-html-content-inner-div"]' ) description = description_[0].text if description_ else "" link = article.find_element( By.XPATH, './/a[@class="ipc-title-link-wrapper"]' ) href = link.get_attribute("href") if not isinstance(href, str): raise TypeError episodes.append(IMDBEntry(season, link.text, href, description)) episodes.sort() for i in episodes: writer.writerow( [j.strip() if isinstance(j, str) else j for j in i.list()] )
[docs] def get_series(self) -> str: """Get Series title.""" url = self.url[:-9] driver = self.driver driver.get(url) wait = WebDriverWait(driver, 10) wrapper = wait.until(ec.presence_of_element_located((By.TAG_NAME, "h1"))) res = wrapper.find_element( By.XPATH, '//h1[contains(@data-testid, "hero__pageTitle")]' ).text with suppress(Exception): res = wrapper.find_element( By.XPATH, '//div[contains(@data-testid, "hero-title-block__original-title")]', ).text.strip() if res.endswith(" (original title)"): res = res[:-17] if res.startswith("Original title: "): res = res[16:] if res.startswith("Originaltitel: "): res = res[15:] return res.strip()
[docs] def get_parser() -> argparse.ArgumentParser: """Create CLI parser.""" parser = argparse.ArgumentParser( description="""Extract IMDB information for TV series. Generates file <Title>.csv. If existing CSV file is given as argument, URL is read from previously generated CSV file.""" ) parser.add_argument( "url", metavar="URL", type=str, nargs=1, help="URL string / existing CSV file" ) parser.add_argument( "title", metavar="TITLE", type=str, nargs="*", help="title string", default="" ) parser.add_argument( "--version", action="version", version=f"%(prog)s {metadata.version('imdb_extract')}", ) parser.add_argument( "-l", "--language", type=str, default="en", help="Language to use for download, default: %(default)s", ) return parser
[docs] def main() -> None: """Main program.""" args = get_parser().parse_args() prog = IMDBInfo(args) CONSOLE.print(f"URL : {prog.url}") CONSOLE.print(f"series: {prog.series}") prog() raise SystemExit
if __name__ == "__main__": main()