The latest application running in my homelab is Apache Airflow. Its purpose is to run tasks that typically run on a regular basis and that do not really change between the runs.
So, to break it down: Airflow is a fancy (and very resource hungry) way to run cronjobs…
But with some advantages: Airflow jobs are broken down into tasks. These tasks run independently of each other, allowing to distribute them over multiple worker nodes. Also, it has got a quite nice UI with a quite comprehensive overview over the tasks.
Motivation
I essentially want to achieve the following:
- Fetch all podcasts in Audiobookshelf, from the API.
- For every podcast, check that it has got it’s automatic downloading schedule set to
hourly
. - Push this setting for each podcast back to Audiobookshelf.
This is a quite simple pipeline. I plan to adapt and add quite a lot more functionality to it, such as:
- Check when the last episode of the podcast was downloaded. If this is “too long ago”, report that.
Note that “too long ago” depends on the podcast itself, and on the episodes it published previously. I will have to figure out how to do that.
I found two important reasons why a podcast may stop producing episodes:- The podcast stopped completely. Pity, but this obviously can happen. In this case, setting a schedule for auto-downloading might not be required any more; maybe even explicitly remove the schedule in this case?
- The podcast changed to a different provider, and the URL I have stored is not valid any more. I did have this case already some times, and it took me some time to find that out.
- Delete old episodes that I already listened to from my server.
I realize that Audiobookshelf is meant to keep them downloaded, but I do not have infinite storage space here. Currently, my Podcast library takes up about 100GB of space…
Setup
First, I have to setup the Audiobookshelf API connection. I do not want to store secrets in a DAG, and Airflow has got a solution to that called Connections
. So I created a connection for the Audiobookshelf API with the following settings:
- Connection ID:
audiobookshelf
. Choose something here, and remember this one. We will need it later on again. - Connection Type:
HTTP
- Description: … Something so that you know what is going on.
- Host:
audiobookshelf.tech-tales.blog
- Schema:
https
(fancy me, encrypting everything inside my home network) - Extras: Now this is important, as it will store the credentials. Audiobookshelf API uses Bearer Tokens as credentials, so we will have to add the respective HTTP header element. My connection hence looks as follows:
{ "Authorization": "Bearer ..." }
The DAG
I have split up my DAG into four tasks:
- Poll the library. This one was quite simple. I used a prepared Operator for this one:
poll_library = HttOperator( task_id="poll-library", http_conn_id="audiobookshelf", # The connection ID from above endpoint=f"api/libraries/{LIBRARY_ID}/items", method="GET" )
- Get a list of podcasts. This one is a Python function, decorated as a task:
def get_list_of_podcasts(ti=None): response = ti.xcom_pull(task_ids="poll-library") body_data = json.loads(response) return [ {"element": element} for element in body_data["results"] if element["mediaType"] == "podcast" ]
- Prepare the request for each podcast on its own. This one became interesting: The previous task returns a list of podcast elements. Now in this step, I would like to execute an own task for each element of the previous list - they don’t have to be executed all at once.It took me a long time to figure that out, altough it is just this few lines of code in the end…
def prepare_endpoint_func(element, **kwargs): logging.info( "Processing podcast %s", element["media"]["metadata"]["title"] ) return f"/api/items/{element['id']}/media" prepare_endpoint = PythonOperator.partial( task_id="prepare-endpoint", python_callable=prepare_endpoint_func, ).expand(op_kwargs=get_list_of_podcasts.output)
Still, I learned quite a lot about XCom and other Airflow stuff on the way, so this is probably fine. - Do the update request. Again, I wanted to create multiple tasks from this one, so I used the
partial().expand()
concept again:update_podcast_request = HttpOperator.partial( task_id="update-podcast-request", http_conn_id="audiobookshelf_chris", method="PATCH", data={ "autoDownloadEpisodes": True, "autoDownloadSchedule": "0 * * * *" }, log_response=False ).expand(endpoint=prepare_endpoint.output)
At the moment, this DAG runs once a week and updates all podcasts. I am fine with that for now, but I am looking forward to implementing the extensions I already mentioned on top.