Перейти к основному содержимому

ETL процессы с Apache Airflow

Apache Aiflow один из наиболее часто встречающихся и востребованных инструментов для организации ETL процессов. Освоение данной системы, на мой взгляд даже «на минималках», даёт существенный прирост Вам как специалисту, а организации, в которой вы работаете, упрощает/ускоряет/систематизирует процессы сбора и обработки данных.

Здесь не будет обучающих материалов по самому Aiflow, ограничусь лишь полезными ссылками для погружения в данный продукт. Данная страница про описание и реализацию конкретных DAG-ов, связанных с высшим образованием.

DAG собирающий НИОКТР, РИД и диссертации

Благодаря имеющимся публичным данным в ЕГИСУ НИОКТР, можно относительно просто позаимствовать (основные механики описаны тут) довольно большой объём информации об университетах по следующим направлениям:

  1. Научно исследовательские и опытно конструкторские работы (НИОКТР)
  2. Результаты интеллектуальной деятельности (РИД)
  3. Докторские и кандидатские диссертации

Очень важной особенностью данного источника информации, является возможность забрать данные ретроспективно, на что очень хорошо ложится одна из 🤩 потрясающих возможностей Aiflow.

По 👉 ссылке можно ознакомиться с полным кодом DAG-а на GitHub, который собирает данные об университетах связанных с программами «Приоритет 2030» и «Передовые инженерные школы».

Dashboard

На базе данных, получаемых с помощью этого DAG-а, построен 📊 публичный Dashboard в Datalens, который пытается визуализировать ответ на вопрос: 🤔 «На сколько НИОКТР конвертируются в диссертации и РИДы?».

Алгоритм сбора данных

Сбор вакансий по университетам осуществляется ⏰ один раз, 15 числа каждого месяца. При этом, происходит сдвиг на 3 месяца назад и загружаются данные за получившийся полный месяц (например в мае, будут загружаться данные за февраль). При реализации, глядя на получавшиеся данные, решил, что это позволит минимизировать потери в данных, связанные с «внесением данных с опозданием или задним числом».

  1. Задача extract_university_list — извлекает из внутренней БД список университетов и их ID в ЕГИСУ НИОКТР. Альтернативой может быть данная ручка API проекта, где id в блоке rosrid и есть упомянутый ID.
  2. Задача extract_universities_actives — извлекает из данные из ЕГИСУ НИОКТР и сохраняет их на S3.
  3. Задача transform_actives — работая с S3, бежит по сформированному в предыдущем пункте массиву данных и формирует один большой JSON-объект. О некоторых компромиссах, влияющих на конечный результат:
    • Каждому скачанному объекту проставляется флаг is_executor. В случае НИОКТР и РИД, в данных про исполнителей должен присутствовать университет, а в случае диссертаций, университет должен являться организацией, которая указана в блоке author_organization.
    • Для того, чтобы объекты можно было легко сопоставлять друг с другом, из всего множества дат, которые присутствуют в данных, выбрана дата created_date из блока last_status.
  4. Задача soft_delete — уже присутствующие в БД объекты и совпадающие со скачанными помечаются как удалённые.
  5. Задача load_actives — загружает, используя JSON-объект из S3 с активами, в базу данных.
  6. Задача month_dump — формирует выгрузку в формате ⬇️ csv и ⬇️ excel за последний скачанный месяц.

DAG собирающий вакансии университетов

Хорошо ❤️ реализованное и описанное HH API позволяет относительно легко и регулярно собирать вакансии компаний, которые используют данный HR-инструмент. На основании полученных данных, на достаточном промежутке времени, можно вполне «качать» какие-то гипотезы, связанные с организациями, попавшими в анализируемую выборку.

По 👉 ссылке можно ознакомиться с полным кодом DAG-а на GitHub, который собирает вакансии университетов связанные с программами «Приоритет 2030» и «Передовые инженерные школы».

Dashboard

На базе данных, получаемых с помощью этого DAG-а, построен 📊 публичный Dashboard в Datalens, на котором можно посмотреть 📈 общую картину по вакансиям и 🔍 поискать более конкретную вакансию в университетах.

Алгоритм сбора вакансий университета

Сбор вакансий по университетам осуществляется ⏰ один раз в неделю по выходным дням.

Обратите внимание

В работе DAG-а используются ручки HH API, которые не требуют авторизации. Но при первых попытках работы с API в больших объёмах (тысячи запросов), через какое-то время API начинало отвечать HTTP-кодом 401.

Чтобы снять данное ограничение, авторизуйтесь на портале для разработчиков и зарегистрируйте своё приложение. После одобрения приложения (по моему опыту, 1-2 недели), авторизуйте приложение (процедура простая, буквально один GET-запрос) и получите «токен приложения», который далее используйте при работе с API.

  1. Задача extract_university_list — извлекает из внутренней БД список университетов и их ID на HH. Альтернативой может быть данная ручка API проекта, где employer_id и есть упомянутый ID.
  2. Задача extract_lists_of_vacancies_by_university — извлекает из HH API список вакансий и сохраняет их на S3.
  3. Задача extract_vacancies_data — по сохранённому ранее в S3 списку вакансий, из HH API, загружаются подробные данные по каждой вакансии, при этом сохраняя всё получаемое на S3.
  4. Задача transform_vacancies — работая с S3, бежит по сформированному в предыдущем пункте массиву данных и формирует один большой JSON-объект, далее сохраняет его на S3.
  5. Задача soft_delete — помечает все вакансии, загруженные ранее, как удалённые.
  6. Задача load_vacancies — загружает, используя JSON-объект из S3 с вакансиями, в базу данных.
  7. Задача month_dump — формирует выгрузку в формате ⬇️ csv и ⬇️ excel за последний месяц.

Вспомогательный DAG по валютам

Из-за того, что заработные платы в вакансиях публикуются не только в рублях, пришлось реализовать DAG работающий с API Центрального банка 🇷🇺 РФ. С кодом DAG-а можно ознакомиться по 👉 ссылке.

Раз в день, ⏰ во второй половине дня, происходит следующее:

  1. Задача extract — обращается в API Центробанка и забираем данные по обменным курсам.
  2. Задача transform — трансформирует полученные данные в нужную нам структуру, в т.ч. приводит всё к 1 руб.
  3. Задача load — загружаем данные в БД.

Данный DAG очень простой, но используемые в нём данные обладают ретроспективой, что встречается не так часто. Если Вы только начинаете разбираться в Airflow, рекомендую попробовать повторить именно его для начала 👍

DAG работающий с Google Alerts

С помощью Google Alerts можно довольно легко отслеживать новостные потоки по конкретным ключевым словам. Вузам рекомендую отслеживать, как минимум, новостные потоки, связанные со своими названиями (официальные и устоявшиеся), иногда можно узнать много неожиданного и интересного об университете, сотрудниках и студентах.

По 👉 ссылке можно ознакомиться с полным кодом DAG-а на GitHub, который собирает публикации в интернете связанные с программами «Приоритет 2030» и «Передовые инженерные школы».

Dashboard

На базе данных, получаемых с помощью этого DAG-а, построен 📊 публичный Dashboard в Datalens.

Алгоритм сбора публикаций

DAG эксплуатирует RSS-каналы, создаваемые Google Alerts по выбранным запросам. В окружении Aiflow создана переменная с ключом google_alert, приблизительно следующего содержания:

Настройки DAG-а
[
{
"topic": "Приоритет 2030",
"feeds": [
{
"keyword": "приоритет 2030",
"url": "https://www.google.com/alerts/feeds/04816217329661954772/16039731801341369303"
},
{
"keyword": "программа стратегического академического лидерства",
"url": "https://www.google.com/alerts/feeds/04816217329661954772/3998321703702580804"
},
{
"keyword": "priority 2030",
"url": "https://www.google.com/alerts/feeds/04816217329661954772/7829070105676972604"
}
]
},
{
"topic": "Передовые инженерные школы",
"feeds": [
{
"keyword": "передовые инженерные школы",
"url": "https://www.google.com/alerts/feeds/04816217329661954772/7495754828704681196"
}
]
}
]

Далее ⏰ раз в день, во второй половине дня, когда Google Alert точно обновится, происходит следующее:

  1. Задача extract — пробегается по всем RSS потокам из настроек и сохраняет их в S3;
  2. Задача transform — извлекает RSS из S3, приводит их данные в необходимый вид, дополнительно пробует загрузить и сохранить первоисточник, и сохраняет получившуюся структуру в S3;
  3. Задача load — загружает данные из S3, формирует SQL-запрос и записывает данные в БД;
  4. Задача month_dump — формирует выгрузку в формате ⬇️ csv и ⬇️ excel за последний месяц.
Обратите внимание

Google Alerts будет собирать ссылки на источники по интересующим Вас запросам довольно «топорно», т.е. в данные будут прилетать ложноположительные источники. Почти всегда это связано с анонсами одной новости, где фигурирует отслеживаемый запрос, на страницах другой. По моему опыту, при правильно подобранных ключевых словах, это количество не превышает 10% от вполне корректного потока, что для бесплатного инструмента отлично 👍

Ссылки для изучения Apache Airflow

Скорей всего, даже для «попробовать», потребуются навыки DevOps-а 😎, в частности Docker и Docker Compose. Запасной, стартовый вариант, использовать готовую сборку Airflow из магазина Яндекс.Облака.

  1. Документация с официального сайта
  2. Книжка «Apache Airflow и конвейеры обработки данных» (сам читал 👌, возможно стоит с неё и начать)
  3. Русскоязычная группа в Телеграмм (ребята суровые, но помогут с конкретными вопросами по Airflow)