Die neueste Applikation in meinem Heimnetzwerk ist Apache Airflow. Damit kann ich Tasks ausführen, die regelmäßig passieren sollen, und sich dabei möglichst wenig verändern.

Oder, in anderen Worten: Airflow ist eine fancy (und sehr ressourcen-hungrige) Alternative um Cronjobs auszuführen…
Aber es gibt auch eine Menge Vorteile: Die Jobs werden in Tasks heruntergebrochen. Jeder Task läuft unabhängig voneinander, wodurch sie auf mehrere Worker Nodes aufgeteilt werden können. Zusätzlich gibt es ein schönes User Interface mit einem ziemlich schönen Überblick über alle laufenden und existierenden Jobs.

Motivation

Ich möchte das folgende erreichen:

  • Suche alle Podcasts in Audiobookshelf, mit der Audiobookshelf-API.
  • Ich möchte dass alle Podcasts regelmäßig, nämlich stündlich, auf Updates geprüft werden.
  • Dieses Setting muss, offensichtlich, wieder nach Audiobookshelf zurückgeschrieben werden.

Diese Pipeline ist ziemlich simpel. Ich plane aber, später mehr damit zu machen:

  • Checke wann die letzte Episode von einem Podcast geladen wurde, und melde wenn das zu lange her ist.
    Offensichtlich hängt “zu lange” in diesem Fall vom Podcast ab, und davon wann die vorigen Episoden heruntergeladen wurden. Das muss ich noch herausfinden, wie ich damit umgehen werde.
    Es gibt zwei Gründe gefunden, warum das passieren könnte:
    • Der Podcast hat einfach aufgehört. Schade, aber kann passieren. In diesem Fall macht es keinen Sinn, den Download-Schedule weiterhin aufrecht zu halten, dann sollte ich den lieber abschalten.
    • Oder der Podcast hat Anbieter gewechselt, oder aus einem anderen Grund die Feed URL verändert. Das ist bereits einige Male passiert, und ich habe jeweils lange gebraucht bis ich das kapiert habe.
  • Alte Episoden von manchen Podcasts könnte ich automatisch löschen, wenn ich sie gehört habe.
    Mir ist schon klar dass Audiobookshelf dafür da ist, diese Episoden zu horten. Aber mein Speicherplatz ist nun mal endlich, und ich habe jetzt schon 100GB Podcasts geladen. Das ist einfach zu viel.

Setup

Zuerst muss ich die Audiobookshelf API Verbindung in Airflow festlegen. Da ich Passworte nicht direkt im DAG sichern möchte, nutze ich das Konzept Connections von Airflow. Die habe ich mit den folgenden Einstellungen erzeugt:

  • Connection ID: audiobookshelf. Diese ID ist wichtig, die werden wir später wieder benötigen.
  • Connection Type: HTTP
  • Description: … Eine Beschreibung, damit wir wissen was passiert.
  • Host: audiobookshelf.tech-tales.blog
  • Schema: https (da ich auch im Heimnetzwerk überall TLS-Verschlüsselung habe, fancy.)
  • Extras: Das ist der wichtige Teil, da hier tatsächlich das Passwort gesichert wird. Audiobookshelf nutzt Bearer-Tokens für die Authentifizierung, also setzen wir den entsprechenden Header. Das schaut bei mir so aus:
    {
        "Authorization": "Bearer ..."
    }
    

Der DAG

Ich habe den DAG in vier Teile aufgeteilt:

  • Sammle Informationen über die Library. Das war einfach, und ich habe den vordefinierten Operator dafür verwendet:
    poll_library = HttOperator(
        task_id="poll-library",
        http_conn_id="audiobookshelf", # Die Connection ID von oben
        endpoint=f"api/libraries/{LIBRARY_ID}/items",
        method="GET"
    )
    
  • Liste alle Podcasts auf. Dafür habe ich eine Python Funktion als Airflow-Task per Decorator genutzt:
    def get_list_of_podcasts(ti=None):
        response = ti.xcom_pull(task_ids="poll-library")
        body_data = json.loads(response)
        return [
            {"element": element}
            for element in body_data["results"]
            if element["mediaType"] == "podcast"
        ]
    
  • Bereite für jeden Podcast den Update-Request vor. Hier wurde es spannend: Der vorige Task gibt eine Liste von Elementen zurück. In diesem Schritt möchte ich für jedes Element der Liste einen eigenen Task erzeugen, da diese ja im Wesentlichen voneinander unabhängig sind.
    def prepare_endpoint_func(element, **kwargs):
        logging.info(
            "Processing podcast %s",
            element["media"]["metadata"]["title"]
        )
        return f"/api/items/{element['id']}/media"
    prepare_endpoint = PythonOperator.partial(
        task_id="prepare-endpoint",
        python_callable=prepare_endpoint_func,
    ).expand(op_kwargs=get_list_of_podcasts.output)
    
    Es hat einige Zeit gedauert bis ich herausgefunden habe wie das gehen kann, auch wenn am Ende nur wenige Zeilen Code dastehen…
    Dadurch habe ich aber eine Menge über XCom und anderes Airflow-Zeug gelernt, und das war es mir wert.
  • Schlussendlich kommt noch der Update-Call für jeden Podcast. Das sollte wieder in mehrere Tasks aufgespalten werden, also nutze ich das partial().expand() Konzept erneut:
    update_podcast_request = HttpOperator.partial(
        task_id="update-podcast-request",
        http_conn_id="audiobookshelf_chris",
        method="PATCH",
        data={
            "autoDownloadEpisodes": True,
            "autoDownloadSchedule": "0 * * * *"
        },
        log_response=False
    ).expand(endpoint=prepare_endpoint.output)
    

Aktuell läuft dieser DAG einmal pro Woche und aktualisiert alle Podcasts. Für jetzt genügt das, aber ich freue mich darauf, die erwähnten Erweiterungen zu implementieren.