Введение в Apache Airflow

Также по теме Airflow:

Apache Airflow — это продвинутый workflow менеджер и незаменимый инструмент в арсенале современного дата инженера. Если смотреть открытые вакансии на позицию data engineer, то нередко встретишь опыт работы с Airflow как одно из требований к позиции.

Я разработал практический курс по Apache Airflow 2.0, он доступен на платформе StartDataJourney, создана она также мною. Приятного обучения - Apache Airflow 2.0: практический курс.

Airflow был разработан в 2014 году в компании Airbnb, автор Maxime Beauchemin. Позже инструмент был передан под опеку в организацию Apache, а в январе 2019 получил статус Top-Level проекта. В этой статье я расскажу про установку, настройку и запуск первого дата пайплайна средствами Apache Airflow. К слову, в 2017 году я уже писал про не менее классный и простой инструмент Luigi от компании Spotify. По своей сути эти два инструмента похожи — оба предназначены для запуска цепочек задач (дата пайплайнов), но есть у них и ряд различий о которых я говорил во время своего выступления на PyCON Russia 2019:

В этой статье я постараюсь рассказать о необходимом минимуме для работы с Airflow. Для начала давайте рассмотрим основные сущности инструмента.

читать дальше

Обзор Python 3.8

Релиз Python 3.8 намечен на октябрь 2019 года, но уже сейчас у каждого есть возможность пощупать набор новых фишек языка. Пока пишу этот пост, на официальном сайте доступна версия python 3.8b2.

Итак, что же нам готовит релиз грядущий?

f-string =

Теперь выводить debug информацию стало ещё проще и красивее. Достаточно в f-string передать знак =:

>> a = 123
>> b = 456
>> print(f'{a=} and {b=}')
a=123 and b=456

Assignment Expressions или :=, он же walrus operator

Легендарный PEP572 из-за которого Гвидо сложил полномочия диктатора Python. Оператор выполняет присваивание значения переменной в выражении. Например, вам хочется проверить есть ли ключ в словаре, а также присвоить значение этого ключа переменной. В версиях Python до 3.8 необходимо сделать:

читать дальше

Видео презентации ETL на Python

12 апреля 2019 года в городе Алматы прошла первая международная IT конференция, организованная объединенной компанией Колёса. Крыша. Маркет.

В этот раз мне удалось выступить в секции Data Science & Analytics в темой ETL на Python, или Построение идемпотентных дата пайплайнов. Цель доклада - познакомить слушателей с инструментами построения batch processing задач в экосистеме Python. В презентации я рассказал про две наиболее популярных тулзы: Luigi и Apache Airflow.

В видео проблематично разглядеть слайды, поэтому смотрите тут:читать дальше

Работа с MySQL в Python

Ранее я уже писал статью про работу с PostgreSQL из Python. Сегодняшний пост будет посвящен другой популярной базе данных MySQL. Мой путь в веб-программирование был классическим: PHP, MySQL и Apache. Среди php-разработчиков MySQL пользуется большей популярностью чем PostgreSQL, хотя последняя предоставляет функционал намного богаче. MySQL до сих пор остаётся лидером среди реляционных open source баз данных, поэтому давайте узнаем как с ней работать через Python.

Установка

В статье речь пойдёт про пакет PyMySQL, это реализация mysql-клиента на чистом Python, поэтому никакие дополнительный Си-библиотеки ставить не придётся. Пакет поддерживает работу с Python 2.7 и Python >= 3.5.

Для установки библиотеки выполняем стандартную команду:

pip install PyMySQL

читать дальше

Как стать Data Engineer

Сейчас специализация в области data engineering активно набирает обороты. Судя по отчёту компании hired.com, спрос на data engineer специалистов вырос на 38%, и рост продолжится. Средняя зарплата у Data Engineer в Нью-Йорке составляет $132 тысячи, а в Сан-Франциско $151 тысячу. Что касается рынка СНГ, то спрос на дата инженеров только начинает расти. В России зарплатная вилка варьируется от 100 тысяч рублей до 250 тысяч. Эту информацию я получил из небольшого анализа открытых вакансий на ресурсах Мой Круг и HeadHunter.

Что такое Data Engineering

Из названия понятно, что область data engineering связана с данными, а именно с их доставкой, хранением и обработкой. Главная задача дата инженеров - обеспечить надёжную инфраструктуру для данных. Если обратиться к пирамиде AI, то data engineering занимает в ней первые 2-3 ступени: Collect, Move & Store, Data Preparation. Из этого следует вывод, что любой data-driven организации жизненно необходим data engineer, чтобы добраться до вершины. читать дальше

Работа с PostgreSQL в Python

PostgreSQL, пожалуй, это самая продвинутая реляционная база данных в мире Open Source Software. По своим функциональным возможностям она не уступает коммерческой БД Oracle и на голову выше собрата MySQL.

Если вы создаёте на Python веб-приложения, то вам не избежать работы с БД. В Python самой популярной библиотекой для работы с PostgreSQL является psycopg2. Эта библиотека написана на Си на основе libpq.

Установка

Тут всё просто, выполняем команду:

pip install psycopg2

Для тех, кто не хочет ставить пакет прямо в системный python, советую использовать pyenv для отдельного окружения. В Unix системах установка psycopg2 потребует наличия вспомогательных библиотек (libpq, libssl) и компилятора. Чтобы избежать сборки, используйте готовый билд:

pip install psycopg2-binary

Но для production среды разработчики библиотеки рекомендуют собирать библиотеку из исходников.

Начало работы

Для выполнения запроса к базе, необходимо с ней соединиться и получить курсор:

import psycopg2
conn = psycopg2.connect(dbname='database', user='db_user', 
                        password='mypassword', host='localhost')
cursor = conn.cursor()

Через курсор происходит дальнейшее общение в базой.

cursor.execute('SELECT * FROM airport LIMIT 10')
records = cursor.fetchall()
...
cursor.close()
conn.close()

После выполнения запроса, получить результат можно несколькими способами:

  • cursor.fetchone() — возвращает 1 строку
  • cursor.fetchall() — возвращает список всех строк
  • cursor.fetchmany(size=5) — возвращает заданное количество строк

Также курсор является итерируемым объектом, поэтому можно так:

for row in cursor:
    print(row)

Хорошей практикой при работе с БД является закрытие курсора и соединения. Чтобы не делать это самому, можно воспользоваться контекстным менеджером:


from contextlib import closing

with closing(psycopg2.connect(...)) as conn:
    with conn.cursor() as cursor:
        cursor.execute('SELECT * FROM airport LIMIT 5')
        for row in cursor:
            print(row)

По умолчанию результат приходит в виде кортежа. Кортеж неудобен тем, что доступ происходит по индексу (изменить это можно, если использовать NamedTupleCursor). Если хотите работать со словарём, то при вызове .cursor передайте аргумент cursor_factory:

from psycopg2.extras import DictCursor
with psycopg2.connect(...) as conn:
    with conn.cursor(cursor_factory=DictCursor) as cursor:
        ...

Формирование запросов

Зачастую в БД выполняются запросы, сформированные динамически. Psycopg2 прекрасно справляется с этой работой, а также берёт на себя ответственность за безопасную обработку строк во избежание атак типа SQL Injection:

cursor.execute('SELECT * FROM airport WHERE city_code = %s', ('ALA', ))
for row in cursor:
    print(row)

Метод execute вторым аргументом принимает коллекцию (кортеж, список и т.д.) или словарь. При формировании запроса необходимо помнить, что:

  • Плейсхолдеры в строке запроса должны быть %s, даже если тип передаваемого значения отличается от строки, всю работу берёт на себя psycopg2.
  • Не нужно обрамлять строки в одинарные кавычки.
  • Если в запросе присутствует знак %, то его необходимо писать как %%.

Именованные аргументы можно писать так:

>>> cursor.execute('SELECT * FROM engine_airport WHERE city_code = %(city_code)s',
                   {'city_code': 'ALA'})
...

Модуль psycopg2.sql

Начиная с версии 2.7, в psycopg2 появился модуль sql. Его цель — упростить и обезопасить работу при формировании динамических запросов. Например, метод execute курсора не позволяет динамически подставить название таблицы.

>>> cursor.execute('SELECT * FROM %s WHERE city_code = %s', ('airport', 'ALA'))
psycopg2.ProgrammingError: ОШИБКА:  ошибка синтаксиса (примерное положение: "'airport'")
LINE 1: SELECT * FROM 'airport' WHERE city_code = 'ALA'

Это можно обойти, если сформировать запрос без участия psycopg2, но есть высокая вероятность оставить брешь (привет, SQL Injection!). Чтобы обезопасить строку, воспользуйтесь функцией psycopg2.extensions.quote_ident, но и про неё легко забыть.

from psycopg2 import sql
...
>>> with conn.cursor() as cursor:
        columns = ('country_name_ru', 'airport_name_ru', 'city_code')
        stmt = sql.SQL('SELECT {} FROM {} LIMIT 5').format(
            sql.SQL(',').join(map(sql.Identifier, columns)),
            sql.Identifier('airport')
        )
        cursor.execute(stmt)
        for row in cursor:
            print(row)
('Французская Полинезия', 'Матайва', 'MVT')
('Индонезия', 'Матак', 'MWK')
('Сенегал', 'Матам', 'MAX')
('Новая Зеландия', 'Матамата', 'MTA')
('Мексика', 'Матаморос', 'MAM')

Транзакции

По умолчанию транзакция создаётся до выполнения первого запроса к БД, и все последующие запросы выполняются в контексте этой транзакции. Завершить транзакцию можно несколькими способами:

  • закрыв соединение conn.close()
  • удалив соединение del conn
  • вызвав conn.commit() или conn.rollback()

Старайтесь избегать длительных транзакций, ни к чему хорошему они не приводят. Для ситуаций, когда атомарные операции не нужны, существует свойство autocommit для connection класса. Когда значение равно True, каждый вызов execute будет моментально отражен на стороне БД (например, запись через INSERT).

with conn.cursor() as cursor:
    conn.autocommit = True
    values = [
        ('ALA', 'Almaty', 'Kazakhstan'),
        ('TSE', 'Astana', 'Kazakhstan'),
        ('PDX', 'Portland', 'USA'),
    ]
    insert = sql.SQL('INSERT INTO city (code, name, country_name) VALUES {}').format(
        sql.SQL(',').join(map(sql.Literal, values))
    )
    cursor.execute(insert)
читать дальше

Poetry: новый менеджер зависимостей в Python

В последнее время в экосистеме Python часто стали появляться инструменты для управления зависимостями. Оно понятно, стандартный pip уже не отвечает современным требованиям: неудобная работа с зависимостями, много ручной работы при подготовке пакетов, проблемы при установке и обновлении и много чего другого.

С недавних пор я начал использовать новый менеджер под названием Poetry. Именно о нём сегодня пойдёт речь.

Функциональные возможности Poetry:

  • Управление зависимостями через toml файл (прощай, requirements.txt)
  • Автоматическое создание изолированного виртуального окружения Python (теперь не нужно для этого вызывать virtualenv)
  • Удобное создание пакетов (отныне не нужно копипастить создавать setup.py каждый раз)
  • poetry.lock файл для фиксирования версий зависимостей

А особенно радует тандем при работе с pyenv. О pyenv я писал три года назад.

читать дальше

Авторизация через Telegram в Django и Python

Предисловие

В начале февраля Павел Дуров анонсировал, что у Telegram появился так называемый Telegram Login Widget. Проще говоря, теперь любой желающий может встроить авторизацию на своем сайте через Telegram, наряду с уже удобными способами входа через привычные для всех Google, Twitter, Facebook и так далее.

В этой заметке я хочу рассказать и наглядно показать как это сделать, используя Django. Исходный код свободно доступен в моем репозитории на GitHub. Пользуйтесь на здоровье.

Те, кто уже имеет опыт работы с Django Framework, знают, что он предоставляет готовую User модель для авторизации, более того, ее очень легко расширить или написать свою. Я решил пойти другим путём, дабы не городить своих велосипедов, воспользовался готовым и хорошо зарекомендовавшим себя пакетом для авторизации в различных сервисах — python-social-auth. Для тех, кто не в курсе, social-auth это набор готовых библиотек для авторизации через такие сайты как Twitter, Facebook, Instagram, Google, VK и многие других. Проект разбит на модули, где основным является social-core, там хранится набор т.н. бэкендов для авторизации. Помимо social-core, есть ряд пакетов для конкретных фреймворков. Для Django это social-app-django, для Tornado соответственно social-app-tornado и так далее. Покрыты практически все известные веб-фрейморки для Python. Так вот к чему это я?

Решил я внести свой вклад в развитие Open Source, и написал backend для авторизации через Telegram в основной модуль social-core. Готовый к слиянию Pull Request можно посмотреть здесь. Правда загвоздка в том, что автор проекта не спешит его сливать в master ветку, на сообщения не реагирует, поэтому пример кода будет базироваться на моем форке social-core. Будем надеяться, что PR примут. Вы можете внести свой вклад, оставив комментарий автору проекта в Issue, который я открыл по случаю готового PR.

Мой проект будет использовать Python 3.6 и Django 2.0.3. Если вам не нравится Django, то вы без труда можете использовать python-social-auth с любым другим фреймворком для реализации авторизации через Telegram для своего сайта.

читать дальше

Amazon Redshift и Python

Amazon Redshift & Python

Amazon Redshift это колоночная база данных от Amazon, способная хранить и обрабатывать петабайты данных. Она поддерживает диалект SQL, что значительно облегчает работу с данными, а также подключение сторонних Business Intelligence систем для последующего анализа. В основе Redshift лежит реляционная база данных PostgreSQL 8 версии.

Для работы с Amazon Redshift в экосистеме Python можно использовать тот же драйвер, что и для работы с PostgreSQL - psycopg2. Всё бы хорошо, но есть один нюанс. Если вы используете в работе Redshift, то наверняка хранимый объём информации превышает десятки терабайт данных. Эффективно обрабатывать такой объём данных при работе на Python затруднительно потому что курсор в psycopg2, отлично работающий в PostgreSQL, в Amazon Redshift не работает. Более подробное обсуждение проблемы можно найти здесь и тут.

Как же быть? Выход есть. Во-первых, можно использовать JDBC драйвер, но это прямой путь в Java. Но мы ведь любим Python, верно? Поэтому воспользуемся вторым вариантом, который рекомендует Amazon. Для выгрузки больших объёмов данных используйте операцию UNLOAD. Данные при выполнении этой операции выгружаются на S3 bucket очень быстро, при этом команда поддерживает ряд параметров, включая параллельное выполнение запроса, сжатие и шифрование. Я не буду вдаваться в подробное описание команды (читайте официальную документацию), а покажу как работу с выгруженными данными легко автоматизировать с помощью Luigi.

В пакете Luigi есть набор классов для работы с Redshift и Amazon, а именно RedshiftUnloadTask. На его основе можно построить datapipeline. Вот пример Task для работы с Redshift UNLOAD:

class RedshiftToS3Task(RedshiftUnloadTask):

    QUERY = """SELECT field1, field2 FROM table1"""
    host = env.str('REDSHIFT_ENDPOINT')
    user = env.str('REDSHIFT_USERNAME')
    password = env.str('REDSHIFT_PASSWORD')
    database = env.str('REDSHIFT_DBNAME')
    table = luigi.Parameter()
    file_prefix = luigi.Parameter()
    s3_bucket = luigi.Parameter()
    aws_access_key_id = env.str('S3_AWS_ACCESS_KEY')
    aws_secret_access_key = env.str('S3_AWS_SECRET_ACCESS_KEY')

    @property
    def s3_unload_path(self):
        return 's3://{}/unload/{}'.format(self.s3_bucket, self.file_prefix)

    @property
    def update_id(self):
        return hashlib.sha1(self.query().encode('utf-8')).hexdigest()

    def query(self):
        return self.QUERY.format(table=self.table)

    @property
    def unload_options(self):
        return "DELIMITER ';' GZIP ALLOWOVERWRITE PARALLEL OFF"

Названия свойств говорят сами за себя, поясню только про update_id. Оно необходимо, чтобы избежать повторного выполнения одного и того же запроса при повторном запуске Luigi скрипта.

Имея в наличии такой Task, несложно автоматизировать дальнейшую работу по скачиванию файла из S3 и последующего его анализа с помощью другого Task. Если незнакомы с Luigi, то читайте подробное описание инструмента и пример построения Datapipeline.

читать дальше

Агрегатор вакансий об удалённой работе

Представляю на обозрение мой новый небольшой проект задача которого собрать в сети лучшие предложения об удалённой работе в Интернете — Remotelist.ru. Мой личный опыт удалённой работы вот вот приблизится к 4-м годам, более того, я активно поддерживаю такой вид занятости потому что у него куда больше плюсов чем минусов. Обещаю следующий мой пост посвятить лайфхакам удалённой работы.

О чём этот проект? Если вы находитесь в активном поиске  работы, то вам нередко приходится мониторить множество предложений с различных сайтов: Мой Круг, Stackoverflow, VC.ru, HH.ru, LinkedIn и многих других. Задача моего сайта собрать как можно больше предложений об удалённой работе в одном месте и оповещать вас о них. Проще говоря, Remotelist это агрегатор вакансий. В планах у меня есть мысль реализовать функцию постинга вакансии, но она появится чуть позже при условии востребованности сайта 🚀.

Сейчас помимо самого сайта, кросс-постинг вакансий в виде дайджестов в автоматическом режиме публикуется и в телеграм-канал @remotelist каждые 3 часа, при условии наличия новых предложений разумеется.

Если звёзды сойдутся благоприятно для сайта, то он будет развиваться в сторону персонализации. Что это значит? Если вы читатель моего блога, то в курсе, что я активно изучаю тему машинного обучения, и этот проект неплохая возможность потренировать свои навыки. Будет реализован личный кабинет с возможностью подключения своего github/stackoverflow/linkedin/etc аккаунта. На основе информации о специалисте, сайт будет рекомендовать наиболее подходящие вакансии, тем самым экономя время и нервы при поиске. Поживём-увидим.

Если вас заинтересовал проект, то, пожалуйста, подпишитесь на телеграм-канал @remotelist, а также добавьте мой сайт в закладки 😄

читать дальше