Как стать Data Engineer

Сейчас специализация в области data engineering активно набирает обороты. Судя по отчёту компании hired.com, спрос на data engineer специалистов вырос на 38%, и рост продолжится. Средняя зарплата у Data Engineer в Нью-Йорке составляет $132 тысячи, а в Сан-Франциско $151 тысячу. Что касается рынка СНГ, то спрос на дата инженеров только начинает расти. В России зарплатная вилка варьируется от 100 тысяч рублей до 250 тысяч. Эту информацию я получил из небольшого анализа открытых вакансий на ресурсах Мой Круг и HeadHunter.

Что такое Data Engineering

Из названия понятно, что область data engineering связана с данными, а именно с их доставкой, хранением и обработкой. Главная задача дата инженеров - обеспечить надёжную инфраструктуру для данных. Если обратиться к пирамиде AI, то data engineering занимает в ней первые 2-3 ступени: Collect, Move & Store, Data Preparation. Из этого следует вывод, что любой data-driven организации жизненно необходим data engineer, чтобы добраться до вершины. читать дальше

Работа с PostgreSQL в Python

PostgreSQL, пожалуй, это самая продвинутая реляционная база данных в мире Open Source Software. По своим функциональным возможностям она не уступает коммерческой БД Oracle и на голову выше собрата MySQL.

Если вы создаёте на Python веб-приложения, то вам не избежать работы с БД. В Python самой популярной библиотекой для работы с PostgreSQL является psycopg2. Эта библиотека написана на Си на основе libpq.

Установка

Тут всё просто, выполняем команду:

pip install psycopg2

Для тех, кто не хочет ставить пакет прямо в системный python, советую использовать pyenv для отдельного окружения. В Unix системах установка psycopg2 потребует наличия вспомогательных библиотек (libpq, libssl) и компилятора. Чтобы избежать сборки, используйте готовый билд:

pip install psycopg2-binary

Но для production среды разработчики библиотеки рекомендуют собирать библиотеку из исходников.

Начало работы

Для выполнения запроса к базе, необходимо с ней соединиться и получить курсор:

import psycopg2
conn = psycopg2.connect(dbname='database', user='db_user', 
                        password='mypassword', host='localhost')
cursor = conn.cursor()

Через курсор происходит дальнейшее общение в базой.

cursor.execute('SELECT * FROM airport LIMIT 10')
records = cursor.fetchall()
...
cursor.close()
conn.close()

После выполнения запроса, получить результат можно несколькими способами:

  • cursor.fetchone() — возвращает 1 строку
  • cursor.fetchall() — возвращает список всех строк
  • cursor.fetchmany(size=5) — возвращает заданное количество строк

Также курсор является итерируемым объектом, поэтому можно так:

for row in cursor:
    print(row)

Хорошей практикой при работе с БД является закрытие курсора и соединения. Чтобы не делать это самому, можно воспользоваться контекстным менеджером:


from contextlib import closing

with closing(psycopg2.connect(...)) as conn:
    with conn.cursor() as cursor:
        cursor.execute('SELECT * FROM airport LIMIT 5')
        for row in cursor:
            print(row)

По умолчанию результат приходит в виде кортежа. Кортеж неудобен тем, что доступ происходит по индексу (изменить это можно, если использовать NamedTupleCursor). Если хотите работать со словарём, то при вызове .cursor передайте аргумент cursor_factory:

from psycopg2.extras import DictCursor
with psycopg2.connect(...) as conn:
    with conn.cursor(cursor_factory=DictCursor) as cursor:
        ...

Формирование запросов

Зачастую в БД выполняются запросы, сформированные динамически. Psycopg2 прекрасно справляется с этой работой, а также берёт на себя ответственность за безопасную обработку строк во избежание атак типа SQL Injection:

cursor.execute('SELECT * FROM airport WHERE city_code = %s', ('ALA', ))
for row in cursor:
    print(row)

Метод execute вторым аргументом принимает коллекцию (кортеж, список и т.д.) или словарь. При формировании запроса необходимо помнить, что:

  • Плейсхолдеры в строке запроса должны быть %s, даже если тип передаваемого значения отличается от строки, всю работу берёт на себя psycopg2.
  • Не нужно обрамлять строки в одинарные кавычки.
  • Если в запросе присутствует знак %, то его необходимо писать как %%.

Именованные аргументы можно писать так:

>>> cursor.execute('SELECT * FROM engine_airport WHERE city_code = %(city_code)s',
                   {'city_code': 'ALA'})
...

Модуль psycopg2.sql

Начиная с версии 2.7, в psycopg2 появился модуль sql. Его цель — упростить и обезопасить работу при формировании динамических запросов. Например, метод execute курсора не позволяет динамически подставить название таблицы.

>>> cursor.execute('SELECT * FROM %s WHERE city_code = %s', ('airport', 'ALA'))
psycopg2.ProgrammingError: ОШИБКА:  ошибка синтаксиса (примерное положение: "'airport'")
LINE 1: SELECT * FROM 'airport' WHERE city_code = 'ALA'

Это можно обойти, если сформировать запрос без участия psycopg2, но есть высокая вероятность оставить брешь (привет, SQL Injection!). Чтобы обезопасить строку, воспользуйтесь функцией psycopg2.extensions.quote_ident, но и про неё легко забыть.

from psycopg2 import sql
...
>>> with conn.cursor() as cursor:
        columns = ('country_name_ru', 'airport_name_ru', 'city_code')
        stmt = sql.SQL('SELECT {} FROM {} LIMIT 5').format(
            sql.SQL(',').join(map(sql.Identifier, columns)),
            sql.Identifier('airport')
        )
        cursor.execute(stmt)
        for row in cursor:
            print(row)
('Французская Полинезия', 'Матайва', 'MVT')
('Индонезия', 'Матак', 'MWK')
('Сенегал', 'Матам', 'MAX')
('Новая Зеландия', 'Матамата', 'MTA')
('Мексика', 'Матаморос', 'MAM')

Транзакции

По умолчанию транзакция создаётся до выполнения первого запроса к БД, и все последующие запросы выполняются в контексте этой транзакции. Завершить транзакцию можно несколькими способами:

  • закрыв соединение conn.close()
  • удалив соединение del conn
  • вызвав conn.commit() или conn.rollback()

Закрытие соединения через del или метод .close неявно приводит к rollback.

Старайтесь избегать длительных транзакций, ни к чему хорошему они не приводят. Для ситуаций, когда атомарные операции не нужны, существует свойство autocommit для connection класса. Когда значение равно True, каждый вызов execute будет моментально отражен на стороне БД (например, запись через INSERT).

with conn.cursor() as cursor:
    conn.autocommit = True
    values = [
        ('ALA', 'Almaty', 'Kazakhstan'),
        ('TSE', 'Astana', 'Kazakhstan'),
        ('PDX', 'Portland', 'USA'),
    ]
    insert = sql.SQL('INSERT INTO city (code, name, country_name) VALUES {}').format(
        sql.SQL(',').join(map(sql.Literal, values))
    )
    cursor.execute(insert)
читать дальше

Poetry: новый менеджер зависимостей в Python

В последнее время в экосистеме Python часто стали появляться инструменты для управления зависимостями. Оно понятно, стандартный pip уже не отвечает современным требованиям: неудобная работа с зависимостями, много ручной работы при подготовке пакетов, проблемы при установке и обновлении и много чего другого.

С недавних пор я начал использовать новый менеджер под названием Poetry. Именно о нём сегодня пойдёт речь.

Функциональные возможности Poetry:

  • Управление зависимостями через toml файл (прощай, requirements.txt)
  • Автоматическое создание изолированного виртуального окружения Python (теперь не нужно для этого вызывать virtualenv)
  • Удобное создание пакетов (отныне не нужно копипастить создавать setup.py каждый раз)
  • poetry.lock файл для фиксирования версий зависимостей

А особенно радует тандем при работе с pyenv. О pyenv я писал три года назад.

читать дальше

Авторизация через Telegram в Django и Python

Предисловие

В начале февраля Павел Дуров анонсировал, что у Telegram появился так называемый Telegram Login Widget. Проще говоря, теперь любой желающий может встроить авторизацию на своем сайте через Telegram, наряду с уже удобными способами входа через привычные для всех Google, Twitter, Facebook и так далее.

В этой заметке я хочу рассказать и наглядно показать как это сделать, используя Django. Исходный код свободно доступен в моем репозитории на GitHub. Пользуйтесь на здоровье.

Те, кто уже имеет опыт работы с Django Framework, знают, что он предоставляет готовую User модель для авторизации, более того, ее очень легко расширить или написать свою. Я решил пойти другим путём, дабы не городить своих велосипедов, воспользовался готовым и хорошо зарекомендовавшим себя пакетом для авторизации в различных сервисах — python-social-auth. Для тех, кто не в курсе, social-auth это набор готовых библиотек для авторизации через такие сайты как Twitter, Facebook, Instagram, Google, VK и многие других. Проект разбит на модули, где основным является social-core, там хранится набор т.н. бэкендов для авторизации. Помимо social-core, есть ряд пакетов для конкретных фреймворков. Для Django это social-app-django, для Tornado соответственно social-app-tornado и так далее. Покрыты практически все известные веб-фрейморки для Python. Так вот к чему это я?

Решил я внести свой вклад в развитие Open Source, и написал backend для авторизации через Telegram в основной модуль social-core. Готовый к слиянию Pull Request можно посмотреть здесь. Правда загвоздка в том, что автор проекта не спешит его сливать в master ветку, на сообщения не реагирует, поэтому пример кода будет базироваться на моем форке social-core. Будем надеяться, что PR примут. Вы можете внести свой вклад, оставив комментарий автору проекта в Issue, который я открыл по случаю готового PR.

Мой проект будет использовать Python 3.6 и Django 2.0.3. Если вам не нравится Django, то вы без труда можете использовать python-social-auth с любым другим фреймворком для реализации авторизации через Telegram для своего сайта.

читать дальше

Amazon Redshift и Python

Amazon Redshift & Python

Amazon Redshift это колоночная база данных от Amazon, способная хранить и обрабатывать петабайты данных. Она поддерживает диалект SQL, что значительно облегчает работу с данными, а также подключение сторонних Business Intelligence систем для последующего анализа. В основе Redshift лежит реляционная база данных PostgreSQL 8 версии.

Для работы с Amazon Redshift в экосистеме Python можно использовать тот же драйвер, что и для работы с PostgreSQL - psycopg2. Всё бы хорошо, но есть один нюанс. Если вы используете в работе Redshift, то наверняка хранимый объём информации превышает десятки терабайт данных. Эффективно обрабатывать такой объём данных при работе на Python затруднительно потому что курсор в psycopg2, отлично работающий в PostgreSQL, в Amazon Redshift не работает. Более подробное обсуждение проблемы можно найти здесь и тут.

Как же быть? Выход есть. Во-первых, можно использовать JDBC драйвер, но это прямой путь в Java. Но мы ведь любим Python, верно? Поэтому воспользуемся вторым вариантом, который рекомендует Amazon. Для выгрузки больших объёмов данных используйте операцию UNLOAD. Данные при выполнении этой операции выгружаются на S3 bucket очень быстро, при этом команда поддерживает ряд параметров, включая параллельное выполнение запроса, сжатие и шифрование. Я не буду вдаваться в подробное описание команды (читайте официальную документацию), а покажу как работу с выгруженными данными легко автоматизировать с помощью Luigi.

В пакете Luigi есть набор классов для работы с Redshift и Amazon, а именно RedshiftUnloadTask. На его основе можно построить datapipeline. Вот пример Task для работы с Redshift UNLOAD:

class RedshiftToS3Task(RedshiftUnloadTask):

    QUERY = """SELECT field1, field2 FROM table1"""
    host = env.str('REDSHIFT_ENDPOINT')
    user = env.str('REDSHIFT_USERNAME')
    password = env.str('REDSHIFT_PASSWORD')
    database = env.str('REDSHIFT_DBNAME')
    table = luigi.Parameter()
    file_prefix = luigi.Parameter()
    s3_bucket = luigi.Parameter()
    aws_access_key_id = env.str('S3_AWS_ACCESS_KEY')
    aws_secret_access_key = env.str('S3_AWS_SECRET_ACCESS_KEY')

    @property
    def s3_unload_path(self):
        return 's3://{}/unload/{}'.format(self.s3_bucket, self.file_prefix)

    @property
    def update_id(self):
        return hashlib.sha1(self.query().encode('utf-8')).hexdigest()

    def query(self):
        return self.QUERY.format(table=self.table)

    @property
    def unload_options(self):
        return "DELIMITER ';' GZIP ALLOWOVERWRITE PARALLEL OFF"

Названия свойств говорят сами за себя, поясню только про update_id. Оно необходимо, чтобы избежать повторного выполнения одного и того же запроса при повторном запуске Luigi скрипта.

Имея в наличии такой Task, несложно автоматизировать дальнейшую работу по скачиванию файла из S3 и последующего его анализа с помощью другого Task. Если незнакомы с Luigi, то читайте подробное описание инструмента и пример построения Datapipeline.

читать дальше

Агрегатор вакансий об удалённой работе

Представляю на обозрение мой новый небольшой проект задача которого собрать в сети лучшие предложения об удалённой работе в Интернете — Remotelist.ru. Мой личный опыт удалённой работы вот вот приблизится к 4-м годам, более того, я активно поддерживаю такой вид занятости потому что у него куда больше плюсов чем минусов. Обещаю следующий мой пост посвятить лайфхакам удалённой работы.

О чём этот проект? Если вы находитесь в активном поиске  работы, то вам нередко приходится мониторить множество предложений с различных сайтов: Мой Круг, Stackoverflow, VC.ru, HH.ru, LinkedIn и многих других. Задача моего сайта собрать как можно больше предложений об удалённой работе в одном месте и оповещать вас о них. Проще говоря, Remotelist это агрегатор вакансий. В планах у меня есть мысль реализовать функцию постинга вакансии, но она появится чуть позже при условии востребованности сайта 🚀.

Сейчас помимо самого сайта, кросс-постинг вакансий в виде дайджестов в автоматическом режиме публикуется и в телеграм-канал @remotelist каждые 3 часа, при условии наличия новых предложений разумеется.

Если звёзды сойдутся благоприятно для сайта, то он будет развиваться в сторону персонализации. Что это значит? Если вы читатель моего блога, то в курсе, что я активно изучаю тему машинного обучения, и этот проект неплохая возможность потренировать свои навыки. Будет реализован личный кабинет с возможностью подключения своего github/stackoverflow/linkedin/etc аккаунта. На основе информации о специалисте, сайт будет рекомендовать наиболее подходящие вакансии, тем самым экономя время и нервы при поиске. Поживём-увидим.

Если вас заинтересовал проект, то, пожалуйста, подпишитесь на телеграм-канал @remotelist, а также добавьте мой сайт в закладки 😄

читать дальше

Designing Data-Intensive Applications

Где-то в середине 2017 года на глаза мне попалась интересная книга издательства O’Reilly под названием “Designing Data-Intensive Applications”. В то время я активно искал информацию в сети на тему Data Engineering. Как оказалось, материала по теме не так много, поэтому книга оказалась для меня открытием. Что же такого примечательного в ней?

Тема “data engineering” заслуживает отдельного поста, который появится в ближайшее время как только соберусь с мыслями. Для меня работа с большими данными это в первую очередь фундаментальные знания об устройстве распределенных систем. Книга Designing Data-Intensive Applications поможет вам окунуться в эту тему с головой и послужит хорошим стартом. Здесь Martin Kleppmann простым языком и очень подробно рассказывает как устроены современные базы данных, как работают sql/nosql/newsql хранилища, в чем разница между B-Tree+ и LSM-Tree, а также достаточно подробное описание современных форматов кодирования данных Avro, Thrift, Protobuf.

Особенно полезна книга будет тем, кто хочет связать свою карьеру с распределенными системами. Автор даёт достаточно информации по таким темам как репликация, партицирование, работа транзакций, а также раскрывает проблемы, возникающие при взаимодействии удаленных узлов. Ну и конечно же куда нам без CAP-теоремы. В книге также упоминаются современные методы обработки данных — Batch Processing и Stream Processing. А в последней главе нас ждет небольшое лирическое отступление и размышления на тему будущего больших данных.

Новость о выходе перевода этой книги на русский язык в издательстве “Питер” меня порадовала хотя я давно купил бумажный оригинал. Такой материал должен быть доступен как можно большему количеству людей. Немного смущает перевод названия книги, в русском варианте оно воспроизведено как “Высоконагруженные приложения. Программирование, масштабирование, поддержка”.

Несмотря на то, что электронный вариант оригинала без труда можно отыскать в сети, я советую купить бумажную версию.  Это фундаментальная книга и с годами она не потеряет свою ценность, уверяю! Заказать бумажный вариант книги в оригинале можно на сайте Amazon, а русский перевод на сайте Ozon.

читать дальше

Машинное обучение и Big Data

Около месяца назад начал проходить сразу 2 специализации на платформе Coursera:

Последняя специализация, к слову, была запущена чуть больше месяца назад. На сегодня закончил по 1 курсу из каждой специализации, а именно вот эти:

Хочу поделиться мыслями о курсах. Начнём с первого.

Цель курса "Математика и Python для анализа данных" — обозначить необходимый набор навыков для успешного прохождения всей специализации. Здесь вы найдёте небольшое введение в язык Python и его инструменты для анализа данных: pandas, numpy, scipy, остальная же часть курса посвящена математическому аппарату, а именно темам из линейной алгебры (матрицы, векторы, векторное пространство), теории вероятностей и немного затронут математический анализ — предел и производная. Особенно понравился упор на прикладные задачи, т.е. изучая, например, матрицы или векторы, понимаешь как их применять для решения прикладных задач. Но курс всё таки требует некоторой предварительной подготовки по обозначенным мною темам.

Эти материалы возможно помогут при прохождении курса:

Big Data Essentials это первый из 5 курсов, посвященных "горячей" нынче теме про построение инфраструктуры для эффективного анализа данных. Анонс специализации я делал у себя в Telegram канале в первой половине октября. Тогда я только присматривался, в итоге решил проходить. Что из себя представляет первый курс? Это плавное введение в основные инструменты анализа больших данных — Apache Hadoop и Apache Spark. Из 6 недель курса, 2 недели исключительно практические, вообще практических заданий тут хватает. Материалы курса предполагают некоторый опыт программирования, от себя добавлю, что большим плюсом будет наличие навыка в функциональном программировании. К сожалению, есть и ряд минусов. Так как материал появился относительно недавно, в нём есть ошибки из-за которых я терял время. Во-первых, встречаются задания с ошибками в формулах и коде, частенько "валился" удалённый hadoop/spark кластер. Мой совет перед выполнением заданий — активно читайте форум, тестируйте код локально на небольших кусках данных, по возможности поднимите свой docker-контейнер с hadoop или spark (в материалах есть ссылка на контейнер, а на форуме инструкция по настройке). Ещё смущает дичайший русский акцент, иногда в речи встречаются неверно составленные предложения, но к этому можно привыкнуть.

Изучение продолжается. На очереди у меня "Обучение на размеченных данных" и "Big Data Analysis: Hive, Spark SQL, DataFrames and GraphFrames". Скучно точно не будет :)

читать дальше

Строим Data Pipeline на Python и Luigi

Data Pipeline

Введение

В эпоху data-intensive приложений рядовым разработчикам всё чаще приходится сталкиваться с задачами по обработке и анализу данных. Ещё десять лет назад данные большинства проектов могли уместиться на жестком диске одного компьютера в какой-нибудь реляционной базе данных типа MySQL. А задачи по извлечению и обработке хранящихся данных решались за счёт непростых (или простых) SQL запросов. С тех пор мир информационных технологий значительно поменялся. С приходом Internet of Things, мобильных телефонов и дешевого мобильного интернета, объем генерируемых данных вырос в десятки тысяч раз. Ежедневно в мире генерируются эксабайты данных. Анализировать такой поток информации вручную, а тем более извлекать полезные для бизнеса или науки данные, практически невозможно. Но технологии как и время не стоят на месте, появляются новые инструменты, наука двигает прогресс. Если вы хоть чуточку следите за новостями из мира высоких технологий, то фразы "биг дата", "машинное обучение", "глубокое обучение" вас не испугают. С приходом больших данных появились новые профессии и специализации такие как Data Scientist/Analyst (по-русски аналитик данных), Data Engineer. Задачи этих ребят тесно связаны с обработкой, анализом и хранением "нефти 21 века", т.е. информации. Но насколько эффективно они выполняются?

ETL

Аббревиатура ETL в последнее время часто мелькает в материалах, посвященных data-driven приложениям. Но не пугайтесь, это всего лишь набор из 3-х простых слов: Extract, Transform, Load. Ничего не напоминает? Тот, кто сталкивался с задачами по обработке данных не раз замечал паттерн в своих действиях, а именно:

  • сначала данные выгружаются (Extract) из какого-нибудь источника типа базы данных, внешнего сервиса (Facebook Ads, Google Analytics, Yandex Metrics) или, на худой конец, это могут быть логи вашего приложения (например, веб-сервера).

  • потом они преобразуются (Transform), скажем, необходимо сформировать сводную таблицу или провести сложный когортный анализ ваших пользователей.

  • и наконец загружаются (Load) для просмотра и дальнейшего анализа в базу данных или на какое-нибудь облако Amazon S3, не суть.

И как ни крути от этого не уйти. Чтобы данные проанализировать, их необходимо подготовить, иначе "мусор на входе — мусор на выходе". Процесс подготовки занимает львиную долю времени, отведенного на работу с данными. До 80% рабочего времени аналитик тратит на сбор и очистку. Поэтому от эффективности ETL-процесса зависит скорость и качество выполненной работы.

Перед тем как перейти к основной идеи этой статьи, я предлагаю кратко рассмотреть самый популярный на сегодняшний день метод построения ETL процесса в компании.

Внимание! На платформе Stepik я запустил полноценный курс по разработке дата-пайплайнов на Luigi. Luigi сильно недооценён, и к нему стоит присмотреться поближе. В курсе я рассказываю зачем нужны пайплайны, как их сделать надёжными и отказоустойчивыми. Всё это заправляется практическими примерами! Не забыл я и про тему деплоя, где затрагивается тема Docker контейнеров, а также облачный деплой дата-пайпланой в AWS с использованием таких технологий как AWS Fargate, Cloud Map, Elastic Container Service и другое.

Более подробно о курсе по ссылке: Введение в Data Engineering: дата-пайплайны

Серые будни работы с данными

Сложно спорить с утверждением, что Python твердо занял позицию lingua franca в задачах по анализу данных. О его взрывной популярности свидетельствует и недавний пост от ребят из Stack Overflow. Но что же предшествует магическому процессу извлечения ценной информации (непосредственному анализу) из гигабайт структурированных и неструктурированных данных? Сбор. Задачи по анализу данных славятся своими жесткими сроками ведь на их основе часто принимают ключевые бизнес-решения. Это сильно отражается на том как мы, разработчики, подходим к процессу написания скриптов. Нам не стыдно создавать скрипты-однодневки с хрупким кодом и горой "разбитых окон". Хорошо ещё, если в этой массе кода есть хоть какая-нибудь структура или модульность для дальнейшего переиспользования в других скриптах, но обычно и этого нет. Краткосрочный выигрыш в скорости оборачивается головными болями в долгосрочной перспективе. Код обрастает "техническим долгом", и им становится сложно управлять. Ниже пример ETL-скрипта с соблюдением принципа Single Responsibility в каждой функции:

def extract_important_data():
    pass

def clean_important_data():
    pass

def transform_important_data():
    pass

def join_important_data_with_another_important_data():
    pass

def load_to_db():
    pass

if __name__ == "__main__":
    extract_important_data()
    clean_important_data()
    transform_important_data()
    join_important_data_with_another_important_data()
    load_to_db()

Всё бы хорошо, но у такого подхода есть ряд проблем:

  • отсутствие хорошего и централизованного обработчика ошибок
  • проблема при управлении зависимостями (между функциями/классами)
  • восстановление в точке остановки скрипта вследствии ошибки (например, получили 500 ошибку от одного из API сервисов)
  • удобная и быстрая работа с командной строкой (при необходимости передавать в скрипт аргументы)

Все они решаемы, но нужно ли изобретать ещё один, когда он давно изобретен и на нём успешно "катаются" специалисты в индустрии?!

Luigi

Luigi это один из немногих инструментов в экосистеме Python для построения т.н. pipeline’ов или, по-простому, выполнения пакетных задач (batch jobs). Разработан был инженерами из Spotify. Мне он понравился за свою простоту и широкий спектр возможностей, а именно:

  • управление зависимостями между задачами

  • failover recovery, т.е. если в одной из задач произошла ошибка, не нужно перезапускать цепочку снова

  • центральный планировщик задач с веб-интерфейсом, статусом выполнения задач и трекингом ошибок

  • “батарейки” для работы с HDFS, S3, MySQL, PostgreSQL, Redis, MongoDB, Redshift и т.д.

  • удобное построение CLI (Command Line Interface), в нём очень удобно построена передача параметров из командной строки

Основными строительными блоками Luigi являются 3 объекта: Task, Target и Parameter. Последний используется для взаимодействия с командной строкой и поэтому опционален.  Чтобы установить Luigi достаточно выполнить:

pip install luigi

Task

Класс Task это основной блок, где происходит выполнение конкретного таска. Чтобы определить свою собственную задачу, необходимо создать класс, унаследованный от Task, и реализовать несколько методов. Зачастую переопределять нужно только 3 метода: run(), output(), requires().

На сайте с документацией к Luigi есть хорошая иллюстрация что из себя представляет каждый метод и класс в целом:

Task.run

Здесь выполняется вся логика вашей будущей задачи, например, скачивание или парсинг данных с внешнего источника, запрос в базу данных для извлечения информации и т.д. Если задача объёмная, то лучше разбить её на функции и вызывать их внутри метода run(), это поможет избежать путанницы в будущем.

Task.requires

Помните я говорил об управлении зависимостями? В методе requires() необходимо их перечислить. Зависимостями выступают другие luigi.Task классы. Чуть позже я покажу реальный пример задачи с зависимостями.

Task.output

Этот метод должен возвращать 1 или более Target объектов. Target объектом может быть файл на диске, файл внутри HDFS, S3 или файл, лежащий на удалённом FTP сервере и т.д.. В Luigi уже встроено множество полезных Target классов, поэтому ситуация, когда вам понадобится создавать свой, маловероятна. Полный список доступных Target классов смотрите на сайте.

Task.input

Этот метод не нужно переопределять. Он выступает "оберткой" над Task.requires и возвращает Target объекты, полученные от выполнения задач, определенных в Task.requires. Таким образом строится граф зависимостей, когда одна задача зависит от результата выполнения другой. Продемонстрирую на примере кода:

import luigi

class A(luigi.Task):

    def output(self):
        return luigi.LocalTarget('result.txt')

    def run(self):
        with self.output().open('w') as f:
            f.write('Hello, Luigi!')

class B(luigi.Task):

    def requires(self):
        return A()

    def run(self):
        with self.input().open('r') as f:
            print(f.read())

if __name__ == '__main__':
    luigi.run()

Здесь таск B зависит от выполнения таска A, поэтому перед началом выполнения B выполнится A, результат которого вернётся при вызове метода B.input (объекта файла result.txt).

Target

Ранее я вкратце описал что из себя представляет объект Target и зачем он нужен. Здесь отмечу, что благодаря этому классу Luigi реализует механизм fault tolerance и свойство идемпотентности. Проще говоря, если ваш pipeline аварийно завершается где-то в середине выполнения задач, повторный запуск не приведёт к повторному запуску успешно завершившихся задач, выполнение начнется в месте аварийной остановки скрипта. Это достигается за счёт вызова метода exists() у Target класса.

Parameter

При создании ETL скриптов часто приходится писать код для работы с командной строкой, а именно уметь принимать и обрабатывать аргументы. Даже наличие в стандартной библиотеке Python модулей для работы с консолью не уменьшает количество boilerplate кода. Luigi решил эту проблему по-своему.

Чтобы принимать аргументы из командной строки достаточно присвоить переменной объект класса Parameter или его наследников на уровне класса.

class TaskA(luigi.Task):
    filename = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('{}.txt'.format(self.filename))

    def run(self):
        with self.output().open('w') as f:
            f.write('Hello, Luigi!')

Пример запуска такого скрипта:

python demo.py TaskA --filename helloworld --local-scheduler

Если в названии вашего параметра присутствует знак ‘_’, то в командной строке его необходимо заменить на ‘-’. То есть передача значения в переменную file_name из командной строки будет выглядеть как --file-name. Параметр --local-scheduler необходим для запуска Luigi без центрального планировщика, в режиме тестирования и разработки.

Luigid

Задача демона Luigi заключается в следующем:

  • Следить за выполнением задач, чтобы исключить ситуацию одновременного исполнения одного и того же таска

  • Визуализация работы скрипта: построение графа зависимостей, просмотр статусов у текущих задач, мониторинг ошибок

Ниже скриншот графа зависимостей внутри демона Luigi на примере простого скрипта о котором расскажу чуть ниже.

По умолчанию демон слушает 8082 порт и запускается командой:

luigid

Чтобы увидеть все доступные параметры запуска необходимо добавить --help.

Пример пайплайна

Человек лучше всего запоминает информацию на практических примерах, поэтому я придумал скрипт в задачу которого входит:

  • спарсить рейтинг фильмов по годам с сайта IMDB
  • сохранить результат каждого года в файл
  • объединить результаты всех лет в 1 файл и отсортировать фильмы по убыванию рейтинга и количества голосов

Вот как выглядит решение этой задачи в Luigi:

import csv

import luigi
from luigi.format import UTF8
import requests
import pandas as pd
from bs4 import BeautifulSoup

class AggregateMovieRatingTask(luigi.Task):
    years = luigi.ListParameter()

    def requires(self):
        return [GetMovieMetaDataTask(year) for year in self.years]

    def output(self):
        return luigi.LocalTarget('results.csv'.format(), format=UTF8)

    def run(self):
        data_frames = []

        for _input in self.input():
            with _input.open('r') as raw_file:
                data_frames.append(pd.read_csv(raw_file))

        df = pd.concat(data_frames)
        df = df.sort_values(['rating', 'votes'], ascending=[False, False])

        with self.output().open('w') as f:
            df[['title', 'rating', 'votes']].to_csv(f)

class GetMovieMetaDataTask(luigi.Task):
    year = luigi.Parameter()

    def get_movie_meta_data(self, film_div):
        title = film_div.h3.a.text
        rating = film_div.find('div', class_='ratings-imdb-rating')
        rating = rating.attrs['data-value'] if rating else 0
        votes = film_div.find('span', attrs={'name': 'nv'})
        votes = votes.attrs['data-value'] if votes else 0
        return {'title': title, 'rating': rating, 'votes': votes}

    def output(self):
        return luigi.LocalTarget('raw-{}.csv'.format(self.year), format=UTF8)

    def run(self):
        url = 'http://www.imdb.com/search/title?release_date={}'.format(self.year)
        response = requests.get(url, headers={
            'Accept-Language': 'ru-RU,ru;q=0.8,en-US;q=0.6,en;q=0.4'
        })
        response.raise_for_status()
        html = BeautifulSoup(response.text, 'html.parser')
        film_container = html.find_all('div', class_='lister-item mode-advanced')
        payload = [self.get_movie_meta_data(film) for film in film_container]

        with self.output().open('w') as csv_file:
            df = pd.DataFrame(payload)
            df.to_csv(csv_file)

if __name__ == '__main__':
    luigi.run()

Скачать скрипт можно по ссылке. Для корректной работы необходимо помимо luigi также установить requests, pandas и beautifulsoup4:

pip install requests, pandas, beautifulsoup4, luigi

Запускайте в терминале демон luigid, а сам скрипт вот таким образом:

python imdb_luigi_list_params.py AggregateMovieRatingTask --years [2013,2014,2015,2016,2017]

Отправной точкой будет класс AggregateMovieRatingTask которому передается список интересующих нас лет. В методе requires() определяется зависимость от GetMovieMetaDataTask, поэтому до тех пор пока не будет получен результат от GetMovieMetaDataTask, код в методе run() у класса AggregateMovieRatingTask не будет исполнен.

При удачном раскладе AggregateMovieRatingTask.input вернёт список, содержащий объекты LocalTarget, полученные от выполнения GetMovieMetaDataTask по каждому году. Дальше необходимо пробежаться по списку, сформировать DataFrame и отсортировать его по убыванию.

Полученных знаний достаточнот для построения сложных пайплайнов с зависимостями.

Ограничения Luigi

Как и у любого другого инструмента, у Luigi есть свои ограничения с которыми приходится мириться в зависимости от ситуаций.

  • Отсутствие механизма запуска задач по расписанию. Если такая потребность имеется, то можно использовать crontab.

  • Luigi не предназначен для real-time обработки, его стихия это batch processing.

  • Сложность масштабирования. Luigi не умеет распределять задачи между воркерами на разных узлах/нодах как это умеет делать Celery, используя единый брокер сообщений (например, Redis или RabbitMQ). Без серьёзного ручного вмешательства тут не обойтись.

Заключение

Моей главной задачей в статье было рассказать про основные возможности Luigi. Вероятно те из вас, кто до сих пор мучается с boilerplate кодом при написании ETL скриптов взглянут на свою работу иначе, и полученная информация сделает вашу работу эффективной и приятной. За более подробным описанием стоит сходить на сайт с документацией. Отмечу, что Luigi не единственный инструмент в своём роде, обратите внимание на продукт под названием Airflow, разработанный в стенах Airbnb и с недавних пор перешедший в "руки" Apache Foundation (на момент написания статьи проект находится в статусе "incubating").

Полезные ссылки

читать дальше

Обновляем подсистему Linux на Windows 10

Пару недель назад вышло обновление для ОС Windows 10 под кодовым названием Creators Update. Помимо ежегодных плюшек в пользовательском интерфейсе и улучшений в производительности, с этим обновлением также "прилетел" апдейт для подсистемы Linux внутри Windows 10. Год назад я уже писал о том как установить Ubuntu в Windows 10. На момент прошлой заметки, в Windows 10 была возможность включить полноценный дистрибутив Linux — Ubuntu версии 14.04 LTS. Всё бы хорошо, но 14.04 вышла 3 года назад, пора бы идти в ногу со временем и обновиться до более свежей версии (учитывая, что в 14.04 стоит Python аж версии 2.7.6). С приходом подсистемы Linux, таким разработчикам как я (любителям Windows) можно забыть про использование инструментов вроде Vagrant для унификации систем развертывания и разработки.

Итак, как же обновить Linux? Если у вас ещё не установлена подсистема Linux, но при этом обновление Creators Update уже стоит, то следуйте инструкциям из моей прошлогодней статьи, с одним лишь замечанием, что теперь Программы и компоненты спрятаны в раздел Приложения и возможности.

читать дальше