Etiqueta: Hadoop

Spark: Introducción al procesamiento distribuido

Vimos en algún post anterior que pasaba con Map-Reduce y como funciona. También hay un pequeño comentario al final diciendo que ya es «poco común encontrar personas que escriban tareas de Map Reduce a mano (en Java)» ya que existen servicios como Hive. Sin embargo, hay otros servicios, que también son distribuidos, pero no están basado en este algoritmo y que son utilizados para procesar datos en Big Data. El principal, es Apache Spark.

Spark comenzó como un proyecto de la Universidad de Berkeley hace ya varios años (2009 aproximadamente). Mas específicamente, escribieron un paper no sobre «Spark» sino mas bien sobre el core de Spark que son los Resilient Distributed Datasets (RDDs). Su objetivo era:

…to provide an abstraction that supports applications with working sets (i.e., applications that reuse an intermediate result in multiple parallel operations) while preserving the attractive properties of MapReduce and related models: automatic fault tolerance, locality-aware scheduling, and scalability. RDDs should be as easy to program against as data flow models, but capable of efficiently expressing computations with working sets.

Traduciendo, su intención era lograr que resultados parciales del procesamiento de los datos, pudiera ser reutilizado. Ergo, esto datasets se van almacenando en memoria y son reutilizados por el proceso sin tener que ir al Disco Rigido para almacenar resultados parciales como en Map Reduce.

Cuando se ejecuta una transformación que modifica un dataset, se crea uno nuevo. Se entiende por transformación, a métodos específicos de los RDD como GroupBy, map o filter. Por otro lado, los RDDs no se materializan en memoria hasta que se ejecuta una acción como por ejemplo collect, count o take. En este aspecto, podemos decir que Spark limita un poco nuestra capacidad de procesamiento a la memoria RAM que tengamos disponible, sin embargo, es extremadamente mas rápido que Map Reduce y ademas existen mecanismos o buenas practicas para evitar desbordar la memoria.

Spark fue principalmente pensado para realizar tareas de Analytics y es hoy el líder del procesamiento de datos en Big Data, tanto en proyectos full Hadoop on Premise, como en la nube de la mano de soluciones como Databricks.

La arquitectura de este sistema de procesamiento, es similar al esquema que utiliza HDFS en general. Se tiene un master, y se tienen nodos que hacen el trabajo pesado. Estos nodos deben, al menos, tener mucha memoria RAM disponible (o acorde para procesar el volumen de datos que se quiere procesar).

Sin embargo, no es todo color de rosa. Spark, es un poco mas sensible respecto a la tolerancia a fallos. Si un nodo de Spark cae durante el procesamiento, la tarea completa falla. Y este es justamente un punto donde Hive con Map-Reduce sigue siendo un poco mejor.

La siguiente vez, hablare un poco mas de la arquitectura y como se utiliza esta tecnología.

Hadoop: Performance – CODECs

Hadoop: Performance – CODECs

Si bien Hadoop esta pensado para grandes volúmenes de datos y para trabajar sobre hardware «commodity», esto no significa que tener nodos con discos llenos sea barato. Por eso, usualmente se utilizan algoritmos que comprimen los datos al almacenarlos. Si lo que hacemos es traer archivos y los almacenamos con ORC o Parquet, deberíamos estar en buenas condiciones. Sin embargo, estos algoritmos de compresión solo trabajan con los datos en formato «columna» o tipo «tabla» (por ejemplo al cargar un CSV).

Por otro lado, no podemos, por ejemplo, almacenar una imagen o un PDF entero con ORC (o al menos no es conveniente hacerlo) por lo que surge una nueva duda ¿Como comprimo estos archivos para ahorrar espacio y como afecta la performance?

Primero, existe un concepto de suma importancia sobre los algoritmos de compresión el cual se conoce en ingles como «Splittable». Que un algoritmo tenga la capacidad de ser «splittable» no es menor. Que no lo sea, implica que no puedo leer un bloque del archivo sin descomprimir el archivo ENTERO. Ergo, solo puedo leer los bloques del archivo comprimido con una sola tarea de MAP perdiendo todas las ventajas de las lecturas distribuidas.

Si el algoritmo SI es «Splittable» entonces SI puedo leer solo una parte del archivo sin descomprimirlo entero con múltiples tareas MAP.

Hadoop admite al menos 4 algoritmos de compresión básicos:

  • GZIP: el adoptado por el proyecto GNU no es splittable y puede implicar ciertos problemas de performance.
  • BZIP2: es el que mejor ratio de compresión tiene, pero también es el mas lento. Este CODE solo lo usaríamos en información que vamos a consultar con MUY BAJA frecuencia.
  • SNAPPY: Este algoritmo no comprime mucho sin embargo es bastante veloz. Lamentablemente no es «Splittable»
  • LZO: es similar a SNAPPY en términos del nivel de compresión y velocidad pero si es «Splittable» (en ciertos casos) ya que tiene la capacidad de almacenar ademas un indice que referencia a cada bloque comprimido.

Para trabajar con archivos de texto plano, LZO puede ser una buena opción. La recomendación que les puedo dar igualmente, es que vayan probando cada uno y vean cual les da mejor resultado. Para los datos mas viejos que tienen que estar disponibles, pueden usar BZIP2 siempre y cuando el cliente que consulte los datos este dispuesto a aceptar la demora. Sin embargo, podría suceder que BZIP2 aun así de mejores resultados que GZIP por ejemplo.

Otra recomendación, es que si van a utilizar alguno de estos algoritmos, es que usen las librerías nativas de los mismos donde sea posible (en vez de la de Java) ya que usualmente dan mejores resultados.

Existen tambien otras alternativas:

  • Por ejemplo antes de escribir el archivo, podríamos dividirlo por código nosotros en archivos mas chicos y comprimirlos luego (si hacen esto, tengan en cuenta el tamaño de un bloque de Hadoop).
  • Almacenar los datos sin comprimir (o al menos los que se consulten con mayor frecuencia).
  • Hacer uso de las «Sequence File» de Hadoop.

Esta ultima opción quedara para otra ocasión.

El trabajo de mejorar performance es como en cualquier motor de bases de datos. El primer paso es obtener métricas, luego hacer cambios, y luego obtener esas métricas de nuevo y evaluar. Es un trabajo demandante y que requiere constancia y ser metódico. Sin embargo, un trabajo bien hecho suele tener un impacto directo en los clientes finales y en mi experiencia personal, suelen ser fuente de una gran satisfacción.

¡Suerte en la odisea!

Map Reduce: Leyendo datos distribuidos

Map Reduce: Leyendo datos distribuidos

Ya contamos en el artículo anterior por qué nace HDFS. Google tenía una necesidad muy específica que lo llevo a diseñar al padre de HDFS llamado GFS (Google File System). Partiendo de la misma base, ahora podemos imaginarnos la siguiente situación:

«Tenemos un archivo, compuesto por bloques, y estos bloques se encuentran en al menos 3 computadoras distintas disponibles para ser leídos. Nuestro archivo contiene texto (una novela por ejemplo) y queremos contar las palabras del mismo».

Para simplificar, podemos ilustrarlo de la siguiente manera asumiendo que todo el texto que tenemos es «Hola, esto es SQAleTec y esto es un artículo sobre Map Reduce»:

Cada cilindro representa el almacenamiento de cada PC y cada cuadrado de color un bloque de datos. A simple vista vemos que cada bloque esta escrito 3 veces (siempre en PCs distintas).

Si queremos hacer un conteo de palabras, yo podría perfectamente contar las palabras de cada bloque en cualquiera de las PC. Pero ya que tenemos varios bloques y PCs con la misma data disponible, lo que podemos hacer, es que cada PC realice una parte de la tarea:

  • Le voy a pedir a la PC1 que me de el resultado del bloque gris
  • A la PC2 del bloque amarillo
  • Y a la PC3 del bloque naranja

Cada PC entonces respondera con el siguiente resultado parcial:

Esto se conoce como distribuir la carga de trabajo. Se crea un pequeño algoritmo que no hace otra cosa que encontrar las palabras dentro del bloque y asociarles el valor 1 para luego poder hacer un conteo (sumando los 1 ya que no se pueden sumar palabras). Respecto a la mejora en velocidad de hacer lo mismo en una sola PC, podríamos decir que estamos «triplicando la velocidad de lectura» y en cierta forma estamos en lo correcto. A este paso se lo conoce como MAP. Cada PC toma la tarea y la ejecuta localmente y responde con su porción de los datos.

Sin embargo, todavía nos falta algo, al menos la palabra «esto», «es» y «sobre» se repiten. Para esto, una de las 3 PC tomara el rol de recibir todos los resultados parciales y ejecutar la suma. Supongamos que la PC 1 recibe todo. Al ejecutar la suma, nos quedara el siguiente resultado:

Esta es la etapa de REDUCE. La etapa REDUCE, es siempre ejecutada por alguna de las PC que almacena los datos (se elige en el momento). Una de las ventajas de Map-Reduce (incluso respecto a su competencia como Spark) es que tiene tolerancia a fallos. Si una etapa MAP no responde porque una PC se desconecto o simplemente se rompió, HDFS es lo suficientemente inteligente para pedirle a otra PC (que tenga los mismo datos) que responda el pedido. En otras palabras, si una de las PC deja de responder, el sistema sigue funcionando (aunque probablemente un poco mas lento).

Map Reduce en realidad es un poco mas complejo que lo mostrado anteriormente. En realidad, existen 2 etapas mas conocidas como SHUFFLE y COMBINE sin embargo no es la función de este articulo entrar en este detalle. Eso lo dejare para otro articulo donde sea mas relevante a tareas de mantenimiento o mejora de performance.

Como comentario final, no esta de mas saber que ya prácticamente nadie en el año 2020 escribe tareas de Map-Reduce a mano. Antiguamente, en la primera versión de Hadoop, uno programaba Map Reduce en Java pero ya es muy raro encontrar alguien que lo haga y la razón principal es que realmente ya no tiene sentido. Existen otros servicios como Apache Hive, que están basados en Map Reduce (ejecutan Java debajo) pero se escribe el código enteramente en HQL (muy similar a SQL) el cual tiene una curva de aprendizaje mucho mas corta y tiene una excelente performance (comparativamente hablando).

Dudas o consultas pueden dejarlas en los comentarios.

HDFS: El núcleo de Hadoop

HDFS: El núcleo de Hadoop

Supongo que todos se preguntaron alguna vez ¿Por que Hadoop? Todo empezó hace muchos años atrás (año 2003) cuando Google libero un paper* sobre un filesystem que ellos mismos habían diseñado para responder a sus propias necesidades. El problema principal que tenía Google era como responder a la demanda de solicitudes y a las búsquedas que tenia que hacer de forma performante. Ya en esa época, el volumen de datos y consultas que tenia Google, era enorme para ese tiempo, por lo que las bases de datos tradicionales, no eran capaces de responder en tiempo y forma. Por lo tanto, crearon GFS, un filesystem distribuido.

Un archivo se compone de bloques, y ese bloque se escribe en el disco duro de una PC. Para leer ese archivo, se envía una solicitud al disco que «trae» ese bloque para que podamos leerlo ¿Pero que pasa si muchos intentan leer el mismo bloque? ¿Que pasa si alguien intentar actualizar el archivo mientras intentamos leerlo? Estas cuestiones llevan en realidad años de estudios que le competen a las personas que estudian sistemas, software, etc. y existen varias formas de solucionar este problema en una PC. Sin embargo, Google penso en algo distinto:

  • ¿Que tal si en vez de escribir en una PC escribo en varias al mismo tiempo?
  • ¿Que pasa si en vez de leer un archivo de un lugar con 1 tiempo de respuesta, lo leo de X cantidad de lugares?

De esta forma, nace el primer concepto de GFS (el papá por así decirlo de HDFS): Lecturas y escrituras distribuidas.

  • Un archivo se sigue almacenando en bloques, pero los bloques se guardan en varios discos almacenados en distintas PC (por default el mismo bloque existe exactamente igual en 3 PC distintas cada una con su propio almacenamiento).
  • De esta forma, para leer un archivo, puedo pedirle a 3 PC distintas, que cada una me responda con una parte del archivo y luego «pegar» los resultados y leer el archivo entero.

La implicancia inmediata de esto es la siguiente:

  • Las escrituras tardan mas tiempo.
  • Pero las lecturas son mucho mas rápidas.
  • Tenes alta disponibilidad inmediata: Si una PC cae, se puede seguir leyendo o escribiendo del resto y el sistema sigue respondiendo.

Sin embargo, para Google, las escrituras no eran un problema, porque ellos escaneaban, indexaban e iniciaban un nuevo proceso cada día siguiente agregando más datos, pero no modificando datos que ya estuvieran almacenados. Tanto GFS y HDFS son filesystem que están pensados solo para escribir y leer, no para actualizar datos (de hecho son bastante ineficientes para eso aunque existen algunas alternativas).

Otra característica importante de estos filesystem, es que estaban pensados para consultar y leer GRANDES VOLÚMENES de datos. Por lo tanto, tomaron otra decisión importante: Los bloques no son de 4kbytes como en el resto de los filesystem, en GFS un bloque pesa 64 MegaBytes (en HDFS es de 128 MB aunque se puede modificar). Esto agrega a su vez sus propias ventajas y desventajas:

  • Actualizar un bloque con nuevos datos es realmente problemático.
  • Pero la cantidad de solicitudes para leer un archivo son mucho menos (solo hago 1 llamado para leer 128 MB y espero que el disco responda en vez de enviar cientos de llamados para leer lo mismo).

Ahora debería empezar a entenderse hacia donde quería ir Google cuando creo GFS. Ellos querían un FileSystem con alta disponibilidad, que pudiera responder a consultas de una forma mucho más rápida aunque eso implique perder más tiempo escribiendo.

La segunda clave de este filesystem es como se ejecuta realmente una lectura distribuida. Aquí es donde entra el segundo tema importante para entender Hadoop y Big Data conocido como Map-Reduce introducido en otro paper del 2004** pero esto será introducido en el siguiente artículo.

*https://static.googleusercontent.com/media/research.google.com/es//archive/gfs-sosp2003.pdf
**https://static.googleusercontent.com/media/research.google.com/es//archive/mapreduce-osdi04.pdf