Exxact / Aprendizaje profundo, HPC, AV, Distribución y más

¿Qué es Apache Spark?

Apache Spark es una de las nuevas tendencias más candentes en el dominio de la tecnología. Es el marco con probablemente el mayor potencial para realizar el fruto del matrimonio entre Big Data y Aprendizaje Automático.

Se ejecuta rápidamente (hasta 100 veces más rápido que Hadoop MapReduce tradicional debido al funcionamiento en memoria, ofrece objetos de datos robustos, distribuidos y tolerantes a fallos (llamados RDD) y se integra perfectamente con el mundo del aprendizaje automático y el análisis de gráficos a través de paquetes complementarios como Mlib y GraphX.

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Apache-Spark-288x300.png

Spark está implementado en Hadoop / HDFS y escrito principalmente en Scala, un lenguaje de programación funcional, similar a Java. De hecho, Scala necesita la última instalación de Java en su sistema y se ejecuta en JVM. Sin embargo, para la mayoría de los principiantes, Scala no es un lenguaje que aprendan primero para aventurarse en el mundo de la ciencia de datos. Afortunadamente, Spark proporciona una maravillosa integración de Python, llamada PySpark, que permite a los programadores de Python interactuar con el framework de Spark y aprender a manipular datos a escala y trabajar con objetos y algoritmos en un sistema de archivos distribuido.

En este artículo, aprenderemos los conceptos básicos de PySpark. Hay muchos conceptos (en constante evolución e introducción), y por lo tanto, solo nos centramos en los fundamentos con unos pocos ejemplos simples. Se anima a los lectores a basarse en ellos y explorar más por su cuenta.

La corta historia de Apache Spark

Apache Spark comenzó como un proyecto de investigación en el AMPLab de la Universidad de California en Berkeley en 2009 y fue de código abierto a principios de 2010. Fue un proyecto de clase en UC Berkeley. La idea era construir un marco de gestión de clústeres, que pueda soportar diferentes tipos de sistemas de computación de clústeres. Muchas de las ideas detrás del sistema se presentaron en varios documentos de investigación a lo largo de los años. Después de su lanzamiento, Spark se convirtió en una amplia comunidad de desarrolladores y se mudó a la Apache Software Foundation en 2013. Hoy en día, el proyecto es desarrollado en colaboración por una comunidad de cientos de desarrolladores de cientos de organizaciones.

Spark no es un lenguaje de programación

Una cosa a recordar es que Spark no es un lenguaje de programación como Python o Java. Es un motor de procesamiento de datos distribuido de uso general, adecuado para su uso en una amplia gama de circunstancias. Es particularmente útil para el procesamiento de big data tanto a escala como a alta velocidad.

Los desarrolladores de aplicaciones y los científicos de datos generalmente incorporan Spark en sus aplicaciones para consultar, analizar y transformar datos rápidamente a escala. Algunas de las tareas que se asocian con mayor frecuencia con Spark incluyen, – trabajos por lotes ETL y SQL en grandes conjuntos de datos (a menudo de tamaño terabytes), – procesamiento de datos de transmisión desde dispositivos y nodos de IoT, datos de varios sensores, sistemas financieros y transaccionales de todo tipo, y – tareas de aprendizaje automático para aplicaciones de comercio electrónico o TI.

En su núcleo, Spark se basa en el marco Hadoop/HDFS para manejar archivos distribuidos. Se implementa principalmente con Scala, una variante de lenguaje funcional de Java. Hay un motor de procesamiento de datos Spark central, pero además de eso, hay muchas bibliotecas desarrolladas para el análisis de consultas de tipo SQL, el aprendizaje automático distribuido, la computación de gráficos a gran escala y el procesamiento de datos en streaming. Spark admite varios lenguajes de programación en forma de bibliotecas de interfaz sencillas: Java, Python, Scala y R.

Spark Utiliza el paradigma MapReduce para el Procesamiento Distribuido

La idea básica del procesamiento distribuido es dividir los fragmentos de datos en pequeños fragmentos manejables (incluidos algunos filtros y ordenamientos), acercar el cálculo a los datos, es decir, usar nodos pequeños de un clúster grande para trabajos específicos y luego volver a combinarlos. La porción de división se llama acción ‘Map’ y la recombinación se llama acción ‘Reducir’. Juntos, hacen el famoso paradigma de ‘MapReduce’, que fue introducido por Google alrededor de 2004 (ver el artículo original aquí).

Por ejemplo, si un archivo tiene 100 registros que procesar, 100 mapeadores pueden ejecutarse juntos para procesar un registro cada uno. O tal vez 50 mapeadores pueden correr juntos para procesar dos registros cada uno. Después de que todos los mapeadores completen el procesamiento, el marco baraja y clasifica los resultados antes de pasarlos a los reductores. Un reductor no puede arrancar mientras un mapeador todavía está en progreso. Todos los valores de salida de mapa que tienen la misma clave se asignan a un único reductor, que luego agrega los valores para esa clave.

PE_TITAN-RTX-Blog-1024x127.jpg

Cómo configurar PySpark

Si ya está familiarizado con Python y bibliotecas como Pandas y Numpy, PySpark es una gran extensión/marco para aprender a crear análisis y canalizaciones más escalables y con uso intensivo de datos utilizando la potencia de Spark en segundo plano.

El proceso exacto de instalación y configuración del entorno PySpark (en una máquina independiente) es algo complicado y puede variar ligeramente según el sistema y el entorno. El objetivo es conseguir que su entorno de ciencia de datos de Jupyter normal funcione con Spark en segundo plano utilizando el paquete PySpark.

Este artículo sobre Medium proporciona más detalles sobre el proceso de configuración paso a paso.

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Apache-Spark-Setup.png

Alternativamente, puede usar la configuración de Databricks para practicar Spark. Esta empresa fue creada por los creadores originales de Spark y tiene un excelente entorno listo para el lanzamiento para realizar análisis distribuidos con Spark.

Pero la idea es siempre la misma. Está distribuyendo (y replicando) su conjunto de datos grande en pequeños trozos fijos en muchos nodos. A continuación, acerque el motor de cómputo a ellos para que toda la operación sea paralela, tolerante a fallos y escalable.

Al trabajar con PySpark y Jupyter notebook, puede aprender todos estos conceptos sin gastar nada en la plataforma AWS o Databricks. También puede interactuar fácilmente con SparkSQL y MLlib para la manipulación de bases de datos y el aprendizaje automático. Será mucho más fácil comenzar a trabajar con clústeres grandes de la vida real si ha interiorizado estos conceptos de antemano.

Conjunto de datos distribuido Resiliente (RDD) y SparkContext

Muchos programas Spark giran en torno al concepto de conjunto de datos distribuido resiliente (RDD), que es una colección de elementos tolerantes a fallos que se pueden operar en paralelo. SparkContext reside en el programa de controladores y administra los datos distribuidos en los nodos de trabajo a través del administrador de clúster. Lo bueno de usar PySpark es que toda esta complejidad de la partición de datos y la administración de tareas se maneja automáticamente en la parte posterior y el programador puede centrarse en el trabajo específico de análisis o aprendizaje automático en sí.

Ejemplos de beneficios de usar Apache Spark con PySpark en Python-Rdd-1.png

rdd-1

Existen dos formas de crear RDD: paralelizar una colección existente en el programa de controladores o hacer referencia a un conjunto de datos en un sistema de almacenamiento externo, como un sistema de archivos compartido, HDFS, HBase o cualquier fuente de datos que ofrezca un formato de entrada Hadoop.

Para la ilustración con un enfoque basado en Python, daremos ejemplos del primer tipo aquí. Podemos crear un array Python simple de 20 enteros aleatorios (entre 0 y 10), usando Numpy random.randint (), y luego cree un objeto RDD de la siguiente manera,

from pyspark import SparkContextimport numpy as npsc=SparkContext(master="local")lst=np.random.randint(0,10,20)A=sc.parallelize(lst)

Observe el ‘4’ en el argumento. Indica 4 núcleos de cómputo (en su máquina local) que se utilizarán para este objeto SparkContext. Si comprobamos el tipo del objeto RDD, obtenemos lo siguiente,

type(A)>> pyspark.rdd.RDD

Frente a la paralelización está la colección (con collect()) que trae todos los elementos distribuidos y los devuelve al nodo principal.

A.collect()>> 

Pero A ya no es una matriz Numpy simple. Podemos usar el método glom () para comprobar cómo se crean las particiones.

A.glom().collect()>> , , , ]

Ahora detenga el SC y reinicialícelo con 2 núcleos y vea qué sucede cuando repite el proceso.

sc.stop()sc=SparkContext(master="local")A = sc.parallelize(lst)A.glom().collect()>> , ]

El RDD ahora se distribuye en dos trozos, ¡no en cuatro!

Ha aprendido sobre el primer paso en el análisis de datos distribuidos, p. ej. controlar cómo se dividen los datos en trozos más pequeños para su posterior procesamiento

Algunos ejemplos de operaciones básicas con RDD & PySpark

>> 20

El primer elemento (first) y los primeros elementos (take)

A.first()>> 4A.take(3)>> 

Eliminar duplicados con el uso de distinct

NOTA: Esta operación requiere una mezcla aleatoria para detectar la duplicación entre particiones. Por lo tanto, es una operación lenta. No exageres.

A_distinct=A.distinct()A_distinct.collect()>> 

Para sumar, todos los elementos utilizan el método de reducción

Tenga en cuenta el uso de una función lambda en este,

A.reduce(lambda x,y:x+y)>> 80

O el método de suma directa ()

A.sum()>> 80

Encontrar el elemento máximo reduciendo

A.reduce(lambda x,y: x if x > y else y)>> 9

Encontrar la palabra más larga en un blob de texto

words = 'These are some of the best Macintosh computers ever'.split(' ')wordRDD = sc.parallelize(words)wordRDD.reduce(lambda w,v: w if len(w)>len(v) else v)>> 'computers'

Utilice el filtro para el filtrado basado en lógica

# Return RDD with elements (greater than zero) divisible by 3A.filter(lambda x:x%3==0 and x!=0).collect()>> 

Escriba funciones regulares de Python para usar con reduce()

def largerThan(x,y): """ Returns the last word among the longest words in a list """ if len(x)> len(y): return x elif len(y) > len(x): return y else: if x < y: return x else: return ywordRDD.reduce(largerThan)>> 'Macintosh'

Observe aquí el x < ¡y hace una comparación lexicográfica y determina que Macintosh es más grande que las computadoras!

Operación de asignación con una función lambda con PySpark

B=A.map(lambda x:x*x)B.collect()>> 

La asignación con una función Python normal en PySpark

def square_if_odd(x): """ Squares if odd, otherwise keeps the argument unchanged """ if x%2==1: return x*x else: return xA.map(square_if_odd).collect()>> 

groupby devuelve un RDD de elementos agrupados (iterables) según una operación de grupo dada

En el siguiente ejemplo, utilizamos una comprensión de lista junto con el grupo para crear una lista de dos elementos, cada uno con un encabezado (el resultado de la función lambda, módulo 2 simple aquí), y una lista ordenada de los elementos que dieron lugar a ese resultado. Puede imaginar fácilmente que este tipo de separación puede ser particularmente útil para procesar datos que deben almacenarse/almacenarse en función de una operación particular realizada sobre ellos.

result=A.groupBy(lambda x:x%2).collect()sorted()>> ), (1, )]

Usando histograma

El método histogram() toma una lista de contenedores/cubos y devuelve una tupla con el resultado del histograma (binning),

B.histogram()>> (, )

Operaciones de conjunto

También puede realizar operaciones de conjunto regulares en RDDs como – unión(), intersección(), resta () o cartesiana().

Echa un vistazo a este cuaderno de Jupyter para obtener más ejemplos.

Descarga de libros electrónicos de aprendizaje profundo

Evaluación perezosa con PySpark (y almacenamiento en caché)

La evaluación perezosa es una estrategia de evaluación/cálculo que prepara un mapa interno detallado paso a paso de la canalización de ejecución para una tarea de computación, pero retrasa la ejecución final hasta cuando sea absolutamente necesario. Esta estrategia está en el corazón de Spark para acelerar muchas operaciones de Big Data en paralelo.

Usemos dos núcleos de CPU para este ejemplo,

sc = SparkContext(master="local")

Hagamos un RDD con 1 millón de elementos

%%timerdd1 = sc.parallelize(range(1000000))>> CPU times: user 316 µs, sys: 5.13 ms, total: 5.45 ms, Wall time: 24.6 ms

Alguna función de cómputo – taketime

from math import cosdef taketime(x): return cos(x)

el tiempo es tomado por la función taketime

%%timetaketime(2)>> CPU times: user 21 µs, sys: 7 µs, total: 28 µs, Wall time: 31.5 µs>> -0.4161468365471424

Recuerde este resultado, la función taketime() tomó un tiempo de pared de 31.5 us. Por supuesto, el número exacto dependerá de la máquina en la que esté trabajando.

Ahora haga la operación de mapa en la función

%%timeinterim = rdd1.map(lambda x: taketime(x))>> CPU times: user 23 µs, sys: 8 µs, total: 31 µs, Wall time: 34.8 µs

Cómo es que cada función taketime toma 45.¿pero la operación del mapa con un RDD de 1 millón de elementos también tomó un tiempo similar?

Debido a la evaluación perezosa, es decir, no se calculó nada en el paso anterior, solo se hizo un plan de ejecución. La variable interim no apunta a una estructura de datos, sino que apunta a un plan de ejecución, expresado como un gráfico de dependencias. El gráfico de dependencias define cómo se calculan los RDD entre sí.

El método de ejecución real mediante reducción

%%timeprint('output =',interim.reduce(lambda x,y:x+y))>> output = -0.28870546796843666>> CPU times: user 11.6 ms, sys: 5.56 ms, total: 17.2 ms, Wall time: 15.6 s

Por lo tanto, el tiempo de pared aquí es de 15,6 segundos. ¿Recuerdas que la función taketime () tenía un tiempo de pared de 31.5 us? Por lo tanto, esperamos que el tiempo total sea del orden de ~ 31 segundos para una matriz de 1 millón. Debido a la operación paralela en dos núcleos, tomó ~ 15 segundos.

Ahora, no hemos guardado (materializado) ningún resultado intermedio en el ínterin, por lo que otra operación simple (por ejemplo, contar elementos > 0) tomará casi el mismo tiempo.

%%timeprint(interim.filter(lambda x:x>0).count())>> 500000>> CPU times: user 10.6 ms, sys: 8.55 ms, total: 19.2 ms, Wall time: 12.1 s

Almacenamiento en caché para reducir el tiempo de cómputo en operaciones similares (memoria de gasto)

¿Recuerda el gráfico de dependencias que construimos en el paso anterior? Podemos ejecutar el mismo cálculo que antes con el método de caché para indicar al gráfico de dependencias que planifique el almacenamiento en caché.

%%timeinterim = rdd1.map(lambda x: taketime(x)).cache()

El primer cálculo no mejorará, pero almacena en caché el resultado provisional,

%%timeprint('output =',interim.reduce(lambda x,y:x+y))>> output = -0.28870546796843666>> CPU times: user 16.4 ms, sys: 2.24 ms, total: 18.7 ms, Wall time: 15.3 s

Ahora ejecute el mismo método de filtro con la ayuda de resultados almacenados en caché,

%%timeprint(interim.filter(lambda x:x>0).count())>> 500000>> CPU times: user 14.2 ms, sys: 3.27 ms, total: 17.4 ms, Wall time: 811 ms

Wow! ¡El tiempo de cómputo se redujo a menos de un segundo desde 12 segundos antes! De esta manera, el almacenamiento en caché y la paralelización con la ejecución perezosa, es la característica principal de la programación con Spark.

Dataframe y SparkSQL

Además del RDD, la segunda estructura de datos clave en el framework de Spark es el DataFrame. Si ha trabajado con Python Pandas o R DataFrame, el concepto puede parecerle familiar.

Un DataFrame es una colección distribuida de filas bajo columnas con nombre. Es conceptualmente equivalente a una tabla en una base de datos relacional, una hoja de Excel con encabezados de columna o un marco de datos en R/Python, pero con optimizaciones más ricas bajo el capó. Los marcos de datos se pueden construir a partir de una amplia gama de fuentes, como archivos de datos estructurados, tablas en colmena, bases de datos externas o RDD existentes. También comparte algunas características comunes con RDD:

  • Naturaleza inmutable: Podemos crear DataFrame / RDD una vez, pero no podemos cambiarlo. Y podemos transformar un DataFrame / RDD después de aplicar transformaciones.
  • Evaluaciones perezosas: Esto significa que una tarea no se ejecuta hasta que se realiza una acción. Distribuido: RDD y DataFrame se distribuyen en la naturaleza.

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-DataFrame-in-PySpark.png

Ventajas del DataFrame

  • Los DataFrames están diseñados para procesar una gran colección de datos estructurados o semiestructurados.
  • Las observaciones en el marco de datos de Spark se organizan en columnas con nombre, lo que ayuda a Apache Spark a comprender el esquema de un marco de datos. Esto ayuda a Spark a optimizar el plan de ejecución de estas consultas.
  • DataFrame en Apache Spark tiene la capacidad de manejar petabytes de datos.
  • DataFrame es compatible con una amplia gama de formatos y fuentes de datos.
  • Tiene soporte de API para diferentes lenguajes como Python, R, Scala, Java.

Ejemplo básico de marcos de datos

Para obtener información básica y ejemplos de uso típicos de marcos de datos, consulte los siguientes cuadernos de Jupyter,

Conceptos básicos de marcos de datos de Spark

Operaciones de marcos de datos de Spark

SparkSQL ayuda a cerrar la brecha para PySpark

Los almacenes de datos relacionales son fáciles de construir y consultar. Los usuarios y desarrolladores a menudo prefieren escribir consultas declarativas fáciles de interpretar en un lenguaje legible similar al humano, como SQL. Sin embargo, a medida que los datos comienzan a aumentar en volumen y variedad, el enfoque relacional no se escala lo suficientemente bien para crear aplicaciones de Big Data y sistemas analíticos.

Hemos tenido éxito en el dominio del análisis de Big Data con Hadoop y el paradigma MapReduce. Esto era potente, pero a menudo lento, y proporcionaba a los usuarios una interfaz de programación procedimental de bajo nivel que requería que las personas escribieran mucho código incluso para transformaciones de datos muy simples. Sin embargo, una vez que se lanzó Spark, realmente revolucionó la forma en que se realizaba el análisis de Big Data con un enfoque en la computación en memoria, tolerancia a errores, abstracciones de alto nivel y facilidad de uso.

Spark SQL esencialmente intenta cerrar la brecha entre los dos modelos que mencionamos anteriormente: los modelos relacionales y procedimentales. Spark SQL funciona a través de la API de marco de datos que puede realizar operaciones relacionales tanto en fuentes de datos externas como en las colecciones distribuidas integradas de Spark, ¡a escala!

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Spark-SQL-1024x489.png

¿Por qué Spark SQL es tan rápido y optimizado? La razón se debe a un nuevo optimizador extensible, Catalyst, basado en construcciones de programación funcional en Scala. Catalyst es compatible con la optimización basada en reglas y en costos. Si bien se han propuesto optimizadores extensibles en el pasado, por lo general han requerido un lenguaje complejo específico del dominio para especificar reglas. Generalmente, esto lleva a tener una curva de aprendizaje significativa y carga de mantenimiento. En contraste, Catalyst utiliza características estándar del lenguaje de programación Scala, como la coincidencia de patrones, para permitir que los desarrolladores usen el lenguaje de programación completo mientras hacen que las reglas sean fáciles de especificar.

Puede consultar el siguiente Jupyter notebook para obtener una introducción a las operaciones de base de datos con SparkSQL:

Conceptos básicos de operaciones de base de datos SparkSQL

¿Cómo va a Usar PySpark en su Proyecto?

Cubrimos los fundamentos del ecosistema de Apache Spark y cómo funciona junto con algunos ejemplos de uso básico de la estructura de datos core RDD con la interfaz de Python PySpark. Además, se discutieron DataFrame y SparkSQL junto con enlaces de referencia, por ejemplo, cuadernos de código.

Hay mucho más que aprender y experimentar con Apache Spark que se usa con Python. El sitio web de PySpark es una buena referencia para tener en su radar, y hacen actualizaciones y mejoras periódicas, así que esté atento a eso.

Y, si está interesado en hacer aprendizaje automático distribuido a gran escala con Apache Spark, eche un vistazo a la parte MLlib del ecosistema PySpark.

¿Disfrutaste De Este Blog de PySpark? Asegúrese de Revisar:

  • Privacy-Preserving Deep Learning – PySyft Versus TF-Encrypted

Accelerate-your-AI.-1024x202.jpg

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *