Dask: una herramienta Pythonic para el procesamiento de datos

Dask: una herramienta Pythonic para el procesamiento de datos

La tecnología ha permitido que el mundo evolucione constantemente. Hemos pasado de utilizar tarjetas perforadas a realizar análisis de datos en cuestión de segundos. Esto generó un nuevo concepto de Big Data y una nueva rama de la informática centrada en la transformación de los datos a información funcional para la toma de decisiones.

Python es una de las herramientas preferidas por los científicos de datos, pero existen otras que igualmente permiten realizar análisis de datos, entre las más conocidas Pandas y Numpy. Sin embargo, ellas tienen limitaciones enormes con colecciones de datos (dataset). Dask, parte del ecosistema nativo de Python para la Ciencia de Datos, sí permite trabajar con diferentes tamaños de dataset.

Tipo de Dataset

Tamaño de Dataset

Procesamiento

Herramienta

Dataset Pequeño

Max. 4GB

RAM y Disco

Pandas y Numpy

Dataset Mediano

Max. 2TB

Disco

Dask o Spark

Dataset Grande

> 2TB

Cluster

Dask o Spark

Tabla 1. Tipos de dataset y herramientas recomendadas

A pesar de que existen datasets que no podemos procesar de forma directa con Pandas o Numpy, esto sí es viable con Dask, el cual gracias a su vínculo con Python nos permite seguir utilizando esas herramientas en nuestros proyectos de Análisis de Datos. Sin importar su tamaño, podemos analizar datasets medianos de forma local, pero se vuelve necesario un cluster para realizar análisis de grandes colecciones de datos.

En sus inicios (2014-2015), Dask nació como una herramienta para objetos Numpy distribuidos y así aprovechar las estaciones de trabajo en diferentes empresas. Desde entonces, ha tenido una evolución significativa hasta el proceso de integrar herramientas  como ciencia geográfica y de análisis de imágenes. Los esfuerzos para mantener la constante  evolución de Dask son enormes, apoyados por instituciones como la NASA, NVIDIA o Anaconda.

Las características de Dask son:

  • Permite trabajar con una amplia gama de tareas.
  • Programación dinámica de tareas.
  • Facilita la utilización de certificados TLS/SSL para autenticación y cifrado.
  • Agilidad para desplegarlo en una computadora o cluster.
  • Trabaja confiablemente en un cluster de hasta 10k núcleos.
  • Machine Learning.
Imagen 1. Flujo de trabajo de Dask. Fuente: Documentación de Dask

Por otra parte, Spark es una herramienta muy similar a Dask desarrollada en Scala (una variante de Java). Actualmente, recibe mejoras de código por parte de la comunidad y forma parte de Apache Software Foundation. Esta herramienta se puede utilizar en diferentes lenguajes de programación, siendo los más utilizados Scala y Java —con el interés aprovechar todo el performance y nuevas características que nos brinda—, pero podemos comunicarnos con el cluster de Spark desde otros lenguajes como SQL, R, F#, C# y, por supuesto, Python.

Para comunicarnos con Apache Spark desde Python debemos utilizar la API llamada PySpark. Como con casi todas las herramientas, si no se utiliza su ambiente nativo podemos lograr resultados interesantes en el inicio del proyecto, pero con el tiempo utilizar una API para comunicarnos pasaría factura y terminaríamos encerrados en un cascarón al no poder utilizar todo el potencial de la herramienta o simplemente esperando que la API se actualice para aprovechar los nuevos features.

Spark y Dask tienen funcionalidades similares como:

  • Ejecución en Paralelo: nos permite trabajar con clusters
  • Ejecución “perezosa”: que trabaja directamente con task graphs, programadores o schedulers.
  • Dask nos ofrece más herramientas que Spark, pues permite realizar pruebas de desarrollo en la computadora sin necesidad de un cluster, así como integración nativa con Scikit-learn para Machine Learning y la herramienta de Joblib para trabajar las funciones por medio de pipelines.

Dask no es lo mismo que Pandas. A pesar de que pueden trabajar de la mano, existen ciertas diferencias que pueden pesar al momento de desarrollo. ¿Cuáles? Bien, los objetos en Dask son inmutables, por lo cual no podemos utilizar de manera directa operaciones. Para solucionar esto, puedes migrar el objeto Dask a Pandas y trabajar tranquilamente o hacer uso del método map_partitions, que permite operaciones Pandas en cada una de las particiones.

La forma de trabajo “pythónica” de Dask hace posible trabajar de forma fácil y ágil en entornos locales y entornos distribuidos, sin necesidad de instalar herramientas no pythónicas como Java, Scala y Spark cluster para ejecutar tu código. Esta es una ventaja enorme ya que no es necesario tener conocimientos fuera del mundo Python para realizar procesos de Ciencias de Datos.

Va una pequeña demostración de la simplicidad de Dask. En este caso, utilizaremos un archivo en Amazon Web Service S3, así como un archivo JSON Raw. Esto significa que no se espera una estructura JSON como tal. Este archivo tiene 5 millones de registros y 20 columnas, por lo que, de momento, solo consumiremos la información sin transformarla o limpiarla.

¿Qué necesitas?

  • Instalación completa de Dask (Pip install “dask[complete]”).
⚠️
Nota: Si solo deseamos instalar Dask dag, podemos hacerlo cambiando la palabra complete por dag.

  • Instalación de la librería de S3 (Pip install s3fs).
⚠️
Nota: solo aplica para la prueba con data en S3. Si se realiza de forma local, se puede omitir esta instalación.

Para el consumo del archivo JSON desde S3 utilizaré la herramienta Colab de Google, por lo que, en esta ocasión, no utilizaré un clúster.

import json

from dask import bag as db

from dask import dataframe as dd

S3_URI_FILE = "s3://your_S3_URI/your_file.json"

S3_KEY = “Your_Key”

S3_SECRET_KEY = “Your Secrete KEY”



%%time

ddf = db.read_text(S3_URI_FILE, storage_options = {'key': S3_KEY, 'secret': S3_SECRET_KEY}).map(json.loads)

Resultado:

CPU times: user 954 µs, sys: 0 ns, total: 954 µs

Wall time: 947 µs

%%time

data_frame = ddf.to_dataframe()

data_frame_pd = ddf.compute()

Resultado:

CPU times: user 2.91 s, sys: 679 ms, total: 3.59 s

Wall time: 6.18 s

Código para probarlo de forma local:

import json

import time

from dask import bag as db

from dask import dataframe as dd

start_time = time.time()

ddf = db.read_text(“Your_json_File.json").map(json.loads)

data_frame = ddf.to_dataframe()

print(f'Time to get data and make a dask dataframe is: {time.time()-start_time}')

start_time_2 = time.time()

data_frame_pd = data_frame.compute()

print(f'time to convert dask  dataframe to pandas dataframe: {time.time()-start_time_2}')

start_time_3 = time.time()

data_frame.visualize()

print(f'time to visualize graphs is: {time.time()-start_time_3}')

print(f'dask task to execute was: {time.time()-start_time}')

Resultado:

Time to get data and make a dask dataframe is: 0.7774131298065186

time to convert dask  dataframe to pandas dataframe: 39.88171863555908

time to visualize graphs is: 0.8343448638916016

dask task to execute was: 41.49371099472046

Si deseas hacer pruebas o llevarlo a implementación de forma local con un cluster, puedes utilizar Docker. Aquí encontrarás la documentación oficial.

Saludos

⚠️
Las opiniones y comentarios emitidos en este artículo son propiedad única de su autor y no necesariamente representan el punto de vista de Revelo.

Revelo Content Network da la bienvenida a todas las razas, etnias, nacionalidades, credos, géneros, orientaciones, puntos de vista e ideologías, siempre y cuando promuevan la diversidad, la equidad, la inclusión y el crecimiento profesional de los profesionales en tecnología.