Etiqueta: Snowflake

Snowflake: Como cargar datos desde AWS S3

Snowflake: Como cargar datos desde AWS S3

Actualmente estoy en un proyecto donde extraemos archivos de distintas fuentes de datos, los enviamos a s3 y luego cargamos los datos a Snowflake. Si pueden evitar usar codigo Python+Pandas, yo preferiria utilizar directamente el comando COPY (o Snowpipes) de Snowflake (que ya esta optimizado para hacer ese bulk insert), pero, segun entiendo, solo lo pueden hacer si Snowflake esta en la misma cuenta donde se encuentran los buckets. Como este no era el caso, ya que nuestros sistemas estan en distintas cuentas de AWS (manejadas todas con Tower Control), me veo obligado a manejar achivos en distintas cuentas de AWS con Snowflake configurado en una cuenta completamente distinta.

Para simplificar la carga, decidi crear un script desde el cual pueda cargar datos desde cualqueir sistema. Para empezar, extraigo los datos en formato CSV ¿Por que CSV? Es el formato recomendado por Snowflake. Les recomiendo revisar los siguientes links:

Haciendo un resumen de las recomendaciones, lo que tienen que tener en cuenta es lo siguiente:

  • Los archivos en lo posible deberian estar comprimidos (gzip o similar)
  • Deben ser mas grandes o iguales a 10 MB (siempre que sea posible)
  • No deberian superar los 250 o 300 MB (pueden ser un poco mas grandes, pero no nunca mayores a unos pocos GB).
  • El formato recomendado es CSV aunque puede procesar otros formatos bastante bien.

Primero hay que agregar los imports:

import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
import pandas as pd
import boto3

Para leer un archivo en S3 debemos hacer lo siguiente:

session = boto3.session.Session()
client = session.client('s3')
csv_object = client.get_object(Bucket=<s3_bucket>, Key=<s3_key>)

with pd.read_csv(csv_object.get("Body"), chunksize=<chunksize>) as reader:
for chunk in reader:
print(chunk.to_string())

De esta forma podemos leer un archivo cada un <chunksize> o cantidad de filas (recomendado para no quedarse sin memoria). En este caso, cada ‘chunk’ es un dataframe de pandsa. Luego, tenemos que crear la conexion a snowflake:

ctx = snowflake.connector.connect(user='<user>',
                                               password='<password>',
                                               account='<account>',
                                               warehouse='<warehouse>',
                                               database='<database>')
cursor = ctx.cursor()

NOTA: No dejen sus passwords directamente en el codigo y menos aun si estan usando Git. Pueden almacenar la informacion en algun archivo seguro o con algun servicio como AWS Secret Manager.

Una vez que la conexion a Snowflake esta abierta, yo corro una serie de consultas para definir el contexto y donde estoy ejecutando las queries:

cursor.execute('USE ROLE <my_role>')
cursor.execute('USE DATABASE <my_db>')
cursor.execute('USE WAREHOUSE <my_compute>')
cursor.execute('USE SCHEMA <my_schema>')

Una vez que el contexto esta listo, puedo escribir los chunks en snowflake:

        with pd.read_csv(csv_object.get("Body"), chunksize=chunksize) as reader:
            for chunk in reader:
                chunk.to_string()
                chunk.columns = chunk.columns.str.upper()
                try:
                    write_pandas(ctx, chunk, <table_name>)
                except Exception as e:
                    print("There was an error while loading certain fields")
                    print(e)

IMPORTANTE: Las columnas del dataframe tienen que estar en uppercase y el dataframe en si mismo tiene que tener el mismo formato que la tabla. Es posible que tengan problemas con las fechas (¿quien no tiene problemas con las fechas?). En ese caso, pueden recorrer el dataframe y corregir las columnas antes de cargarlo a Snowflake. Por ejemplo, podemos convertir a ‘datetime’ las columnas que contengan ‘_DATE’ o ‘DATE_’ en el nombre:

        for col in chunk.columns:
            if ('_DATE' in col or 'DATE_' in col):
                df[col] = pd.to_datetime(df[col], format='%Y-%m-%d %H:%M:%S.%f')
                df[col] = df[col].dt.tz_localize('UTC')

Cualquier duda o comentario pueden dejarlo debajo.