ETL процессы с Apache Airflow
Apache Aiflow один из наиболее часто встречающихся и востребованных инструментов для организации ETL процессов. Освоение данной системы, на мой взгляд даже «на минималках», даёт существенный прирост Вам как специалисту, а организации, в которой вы работаете, упрощает/ускоряет/систематизирует процессы сбора и обработки данных.
Здесь не будет обучающих материалов по самому Aiflow, ограничусь лишь полезными ссылками для погружения в данный продукт. Данная страница про описание и реализацию конкретных DAG-ов, связанных с высшим образованием.
DAG собирающий НИОКТР, РИД и диссертации
Благодаря имеющимся публичным данным в ЕГИСУ НИОКТР, можно относительно просто позаимствовать (основные механики описаны тут) довольно большой объём информации об университетах по следующим направлениям:
- Научно исследовательские и опытно конструкторские работы (НИОКТР)
- Результаты интеллектуальной деятельности (РИД)
- Докторские и кандидатские диссертации
Очень важной особенностью данного источника информации, является возможность забрать данные ретроспективно, на что очень хорошо ложится одна из 🤩 потрясающих возможностей Aiflow.
По 👉 ссылке можно ознакомиться с полным кодом DAG-а на GitHub, который собирает данные об университетах связанных с программами «Приоритет 2030» и «Передовые инженерные школы».
На базе данных, получаемых с помощью этого DAG-а, построен 📊 публичный Dashboard в Datalens, который пытается визуализировать ответ на вопрос: 🤔 «На сколько НИОКТР конвертируются в диссертации и РИДы?».
Алгоритм сбора данных
Сбор вакансий по университетам осуществляется ⏰ один раз, 15 числа каждого месяца. При этом, происходит сдвиг на 3 месяца назад и загружаются данные за получившийся полный месяц (например в мае, будут загружаться данные за февраль). При реализации, глядя на получавшиеся данные, решил, что это позволит минимизировать потери в данных, связанные с «внесением данных с опозданием или задним числом».
- Задача
extract_university_list
— извлекает из внутренней БД список университетов и их ID в ЕГИСУ НИОКТР. Альтернативой может быть данная ручка API проекта, гдеid
в блокеrosrid
и есть упомянутый ID. - Задача
extract_universities_actives
— извлекает из данные из ЕГИСУ НИОКТР и сохраняет их на S3. - Задача
transform_actives
— работая с S3, бежит по сформированному в предыдущем пункте массиву данных и формирует один большой JSON-объект. О некоторых компромиссах, влияющих на конечный результат:- Каждому скачанному объекту проставляется флаг
is_executor
. В случае НИОКТР и РИД, в данных про исполнителей должен присутствовать университет, а в случае диссертаций, университет должен являться организацией, которая указана в блокеauthor_organization
. - Для того, чтобы объекты можно было легко сопоставлять друг с другом, из всего множества дат, которые присутствуют в данных, выбрана дата
created_date
из блокаlast_status
.
- Каждому скачанному объекту проставляется флаг
- Задача
soft_delete
— уже присутствующие в БД объекты и совпадающие со скачанными помечаются как удалённые. - Задача
load_actives
— загружает, используя JSON-объект из S3 с активами, в базу данных. - Задача
month_dump
— формирует выгрузку в формате ⬇️ csv и ⬇️ excel за последний скачанный месяц.
DAG собирающий вакансии университетов
Хорошо ❤️ реализованное и описанное HH API позволяет относительно легко и регулярно собирать вакансии компаний, которые используют данный HR-инструмент. На основании полученных данных, на достаточном промежутке времени, можно вполне «качать» какие-то гипотезы, связанные с организациями, попавшими в анализируемую выборку.
По 👉 ссылке можно ознакомиться с полным кодом DAG-а на GitHub, который собирает вакансии университетов связанные с программами «Приоритет 2030» и «Передовые инженерные школы».
На базе данных, получаемых с помощью этого DAG-а, построен 📊 публичный Dashboard в Datalens, на котором можно посмотреть 📈 общую картину по вакансиям и 🔍 поискать более конкретную вакансию в университетах.
Алгоритм сбора вакансий университета
Сбор вакансий по университетам осуществляется ⏰ один раз в неделю по выходным дням.
В работе DAG-а используются ручки HH API, которые не требуют авторизации. Но при первых попытках работы с API в больших объёмах (тысячи запросов), через какое-то время API начинало отвечать HTTP-кодом 401
.
Чтобы снять данное ограничение, авторизуйтесь на портале для разработчиков и зарегистрируйте своё приложение. После одобрения приложения (по моему опыту, 1-2 недели), авторизуйте приложение (процедура простая, буквально один GET-запрос) и получите «токен приложения», который далее используйте при работе с API.
- Задача
extract_university_list
— извлекает из внутренней БД список университетов и их ID на HH. Альтернативой может быть данная ручка API проекта, гдеemployer_id
и есть упомянутый ID. - Задача
extract_lists_of_vacancies_by_university
— извлекает из HH API список вакансий и сохраняет их на S3. - Задача
extract_vacancies_data
— по сохранённому ранее в S3 списку вакансий, из HH API, загружаются подробные данные по каждой вакансии, при этом сохраняя всё получаемое на S3. - Задача
transform_vacancies
— работая с S3, бежит по сформированному в предыдущем пункте массиву данных и формирует один большой JSON-объект, далее сохраняет его на S3. - Задача
soft_delete
— помечает все вакансии, загруженные ранее, как удалённые. - Задача
load_vacancies
— загружает, используя JSON-объект из S3 с вакансиями, в базу данных. - Задача
month_dump
— формирует выгрузку в формате ⬇️ csv и ⬇️ excel за последний месяц.
Вспомогательный DAG по валютам
Из-за того, что заработные платы в вакансиях публикуются не только в рублях, пришлось реализовать DAG работающий с API Центрального банка 🇷🇺 РФ. С кодом DAG-а можно ознакомиться по 👉 ссылке.
Раз в день, ⏰ во второй половине дня, происходит следующее:
- Задача
extract
— обращается в API Центробанка и забираем данные по обменным курсам. - Задача
transform
— трансформирует полученные данные в нужную нам структуру, в т.ч. приводит всё к 1 руб. - Задача
load
— загружаем данные в БД.
Данный DAG очень простой, но используемые в нём данные обладают ретроспективой, что встречается не так часто. Если Вы только начинаете разбираться в Airflow, рекомендую попробовать повторить именно его для начала 👍
DAG работающий с Google Alerts
С помощью Google Alerts можно довольно легко отслеживать новостные потоки по конкретным ключевым словам. Вузам рекомендую отслеживать, как минимум, новостные потоки, связанные со своими названиями (официальные и устоявшиеся), иногда можно узнать много неожиданного и интересного об университете, сотрудниках и студентах.
По 👉 ссылке можно ознакомиться с полным кодом DAG-а на GitHub, который собирает публикации в интернете связанные с программами «Приоритет 2030» и «Передовые инженерные школы».
На базе данных, получаемых с помощью этого DAG-а, построен 📊 публичный Dashboard в Datalens.
Алгоритм сбора публикаций
DAG эксплуатирует RSS-каналы, создаваемые Google Alerts по выбранным запросам. В окружении Aiflow создана переменная с ключом google_alert
, приблизительно следующего содержания:
[
{
"topic": "Приоритет 2030",
"feeds": [
{
"keyword": "приоритет 2030",
"url": "https://www.google.com/alerts/feeds/04816217329661954772/16039731801341369303"
},
{
"keyword": "программа стратегического академического лидерства",
"url": "https://www.google.com/alerts/feeds/04816217329661954772/3998321703702580804"
},
{
"keyword": "priority 2030",
"url": "https://www.google.com/alerts/feeds/04816217329661954772/7829070105676972604"
}
]
},
{
"topic": "Передовые инженерные школы",
"feeds": [
{
"keyword": "передовые инженерные школы",
"url": "https://www.google.com/alerts/feeds/04816217329661954772/7495754828704681196"
}
]
}
]
Далее ⏰ раз в день, во второй половине дня, когда Google Alert точно обновится, происходит следующее:
- Задача
extract
— пробегается по всем RSS потокам из настроек и сохраняет их в S3; - Задача
transform
— извлекает RSS из S3, приводит их данные в необходимый вид, дополнительно пробует загрузить и сохранить первоисточник, и сохраняет получившуюся структуру в S3; - Задача
load
— загружает данные из S3, формирует SQL-запрос и записывает данные в БД; - Задача
month_dump
— формирует выгрузку в формате ⬇️ csv и ⬇️ excel за последний месяц.
Google Alerts будет собирать ссылки на источники по интересующим Вас запросам довольно «топорно», т.е. в данные будут прилетать ложноположительные источники. Почти всегда это связано с анонсами одной новости, где фигурирует отслеживаемый запрос, на страницах другой. По моему опыту, при правильно подобранных ключевых словах, это количество не превышает 10% от вполне корректного потока, что для бесплатного инструмента отлично 👍
Ссылки для изучения Apache Airflow
Скорей всего, даже для «попробовать», потребуются навыки DevOps-а 😎, в частности Docker и Docker Compose. Запасной, стартовый вариант, использовать готовую сборку Airflow из магазина Яндекс.Облака.
- Документация с официального сайта
- Книжка «Apache Airflow и конвейеры обработки данных» (сам читал 👌, возможно стоит с неё и начать)
- Русскоязычная группа в Телеграмм (ребята суровые, но помогут с конкретными вопросами по Airflow)