Amazon Redshift и Python

Amazon Redshift & Python

Amazon Redshift это колоночная база данных от Amazon, способная хранить и обрабатывать петабайты данных. Она поддерживает диалект SQL, что значительно облегчает работу с данными, а также подключение сторонних Business Intelligence систем для последующего анализа. В основе Redshift лежит реляционная база данных PostgreSQL 8 версии.

Для работы с Amazon Redshift в экосистеме Python можно использовать тот же драйвер, что и для работы с PostgreSQL - psycopg2. Всё бы хорошо, но есть один нюанс. Если вы используете в работе Redshift, то наверняка хранимый объём информации превышает десятки терабайт данных. Эффективно обрабатывать такой объём данных при работе на Python затруднительно потому что курсор в psycopg2, отлично работающий в PostgreSQL, в Amazon Redshift не работает. Более подробное обсуждение проблемы можно найти здесь и тут.

Как же быть? Выход есть. Во-первых, можно использовать JDBC драйвер, но тогда путь в Java вам заказан. Но мы ведь любим Python, верно? Поэтому воспользуемся вторым вариантом, который рекомендует Amazon. Для выгрузки больших объёмов данных используйте операцию UNLOAD. Данные при выполнении этой операции выгружаются на S3 bucket очень быстро, при этом команда поддерживает ряд параметров, включая параллельное выполнение запроса, сжатие и шифрование. Я не буду вдаваться в подробное описание команды (читайте официальную документацию), а покажу как работу с выгруженными данными легко автоматизировать с помощью Luigi.

В пакете Luigi есть набор классов для работы с Redshift и Amazon, а именно RedshiftUnloadTask. На его основе можно построить datapipeline. Вот пример Task для работы с Redshift UNLOAD:

class RedshiftToS3Task(RedshiftUnloadTask):
    QUERY = """SELECT field1, field2 FROM table1"""
    host = env.str('REDSHIFT_ENDPOINT')
    user = env.str('REDSHIFT_USERNAME')
    password = env.str('REDSHIFT_PASSWORD')
    database = env.str('REDSHIFT_DBNAME')
    table = luigi.Parameter()
    file_prefix = luigi.Parameter()
    s3_bucket = luigi.Parameter()
    aws_access_key_id = env.str('S3_AWS_ACCESS_KEY')
    aws_secret_access_key = env.str('S3_AWS_SECRET_ACCESS_KEY')

    @property
    def s3_unload_path(self):
        return 's3://{}/unload/{}'.format(self.s3_bucket, self.file_prefix)

    @property
    def update_id(self):
        return hashlib.sha1(self.query().encode('utf-8')).hexdigest()

    def query(self):
        return self.QUERY.format(table=self.table)

    @property
    def unload_options(self):
        return "DELIMITER ';' GZIP ALLOWOVERWRITE PARALLEL OFF"

Названия свойств говорят сами за себя, поясню только про update_id. Оно необходимо, чтобы избежать повторного выполнения одного и того же запроса при повторном запуске Luigi скрипта.

Имея в наличии такой Task, несложно автоматизировать дальнейшую работу по скачиванию файла из S3 и последующего его анализа с помощью другого Task. Если незнакомы с Luigi, то читайте подробное описание инструмента и пример построения Datapipeline.

Интересные записи: