Exxact / Apprentissage en profondeur, HPC, AUDIOVISUEL, Distribution et plus

Qu’est-ce qu’Apache Spark ?

Apache Spark est l’une des nouvelles tendances les plus en vogue dans le domaine de la technologie. C’est le cadre avec probablement le plus grand potentiel pour réaliser le fruit du mariage entre le Big Data et l’apprentissage automatique.

Il fonctionne rapidement (jusqu’à 100 fois plus vite que le MapReduce Hadoop traditionnel grâce au fonctionnement en mémoire, offre des objets de données robustes, distribués et tolérants aux pannes (appelés RDD) et s’intègre parfaitement au monde de l’apprentissage automatique et de l’analyse graphique via des packages supplémentaires tels que Mlib et GraphX.

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Apache-Spark-288x300.png

Spark est implémenté sur Hadoop/HDFS et écrit principalement en Scala, un langage de programmation fonctionnel, similaire à Java. En fait, Scala a besoin de la dernière installation Java sur votre système et s’exécute sur JVM. Cependant, pour la plupart des débutants, le Scala n’est pas un langage qu’ils apprennent en premier pour s’aventurer dans le monde de la science des données. Heureusement, Spark fournit une merveilleuse intégration Python, appelée PySpark, qui permet aux programmeurs Python de s’interfacer avec le framework Spark et d’apprendre à manipuler des données à grande échelle et à travailler avec des objets et des algorithmes sur un système de fichiers distribué.

Dans cet article, nous allons apprendre les bases de PySpark. Il y a beaucoup de concepts (en constante évolution et introduits), et par conséquent, nous nous concentrons simplement sur les fondamentaux avec quelques exemples simples. Les lecteurs sont encouragés à s’en inspirer et à explorer davantage par eux-mêmes.

La Courte histoire d’Apache Spark

Apache Spark a commencé comme un projet de recherche à l’AMPLab de l’Université de Berkeley en 2009 et a été ouvert début 2010. C’était un projet de classe à l’UC Berkeley. L’idée était de construire un cadre de gestion de cluster, qui peut prendre en charge différents types de systèmes informatiques de cluster. Bon nombre des idées derrière le système ont été présentées dans divers documents de recherche au fil des ans. Après sa sortie, Spark est devenu une vaste communauté de développeurs et a rejoint l’Apache Software Foundation en 2013. Aujourd’hui, le projet est développé en collaboration par une communauté de centaines de développeurs de centaines d’organisations.

Spark n’est pas un langage de programmation

Une chose à retenir est que Spark n’est pas un langage de programmation comme Python ou Java. Il s’agit d’un moteur de traitement de données distribué à usage général, adapté à une utilisation dans un large éventail de circonstances. Il est particulièrement utile pour le traitement de big Data à grande échelle et à grande vitesse.

Les développeurs d’applications et les data scientists intègrent généralement Spark dans leurs applications pour interroger, analyser et transformer rapidement les données à grande échelle. Parmi les tâches les plus fréquemment associées à Spark, citons – les tâches par lots ETL et SQL sur de grands ensembles de données (souvent de téraoctets de taille), – le traitement de données en streaming à partir d’appareils et de nœuds IoT, de données provenant de divers capteurs, de systèmes financiers et transactionnels de toutes sortes, et – les tâches d’apprentissage automatique pour le commerce électronique ou les applications informatiques.

À la base, Spark s’appuie sur le framework Hadoop/HDFS pour gérer les fichiers distribués. Il est principalement implémenté avec Scala, une variante de langage fonctionnel de Java. Il existe un moteur de traitement de données Spark de base, mais en plus de cela, de nombreuses bibliothèques sont développées pour l’analyse de requêtes de type SQL, l’apprentissage automatique distribué, le calcul de graphiques à grande échelle et le traitement de données en continu. Plusieurs langages de programmation sont pris en charge par Spark sous la forme de bibliothèques d’interfaces faciles : Java, Python, Scala et R.

Spark Utilise le paradigme MapReduce pour le traitement distribué

L’idée de base du traitement distribué est de diviser les morceaux de données en petits morceaux gérables (y compris un filtrage et un tri), de rapprocher le calcul des données, c’est-à-dire d’utiliser de petits nœuds d’un grand cluster pour des tâches spécifiques, puis de les recombiner. La partie de division est appelée l’action ’Map’ et la recombinaison est appelée l’action ’Reduce’. Ensemble, ils fabriquent le célèbre paradigme « MapReduce », introduit par Google vers 2004 (voir l’article original ici).

Par exemple, si un fichier contient 100 enregistrements à traiter, 100 mappeurs peuvent s’exécuter ensemble pour traiter un enregistrement chacun. Ou peut-être que 50 mappeurs peuvent s’exécuter ensemble pour traiter deux enregistrements chacun. Une fois que tous les mappeurs ont terminé le traitement, le framework mélange et trie les résultats avant de les transmettre aux réducteurs. Un réducteur ne peut pas démarrer alors qu’un mappeur est toujours en cours. Toutes les valeurs de sortie de carte qui ont la même clé sont affectées à un seul réducteur, qui agrège ensuite les valeurs de cette clé.

PE_TITAN-RTX-Blog-1024x127.jpg

Comment configurer PySpark

Si vous êtes déjà familier avec Python et les bibliothèques telles que Pandas et Numpy, alors PySpark est une excellente extension / framework à apprendre afin de créer des analyses et des pipelines plus évolutifs et gourmands en données en utilisant la puissance de Spark en arrière-plan.

Le processus exact d’installation et de configuration de l’environnement PySpark (sur une machine autonome) est quelque peu impliqué et peut varier légèrement en fonction de votre système et de votre environnement. L’objectif est de faire fonctionner votre environnement de science des données Jupyter habituel avec Spark en arrière-plan à l’aide du package PySpark.

Cet article sur Medium fournit plus de détails sur le processus de configuration étape par étape.

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Apache-Spark-Setup.png

Vous pouvez également utiliser la configuration de Databricks pour pratiquer Spark. Cette société a été créée par les créateurs originaux de Spark et dispose d’un excellent environnement prêt à être lancé pour effectuer des analyses distribuées avec Spark.

Mais l’idée est toujours la même. Vous distribuez (et répliquez) votre grand ensemble de données en petits morceaux fixes sur de nombreux nœuds. Vous rapprochez ensuite le moteur de calcul pour que l’ensemble de l’opération soit parallélisé, tolérant aux pannes et évolutif.

En travaillant avec PySpark et Jupyter notebook, vous pouvez apprendre tous ces concepts sans rien dépenser sur la plate-forme AWS ou Databricks. Vous pouvez également vous interfacer facilement avec SparkSQL et MLlib pour la manipulation de bases de données et l’apprentissage automatique. Il sera beaucoup plus facile de commencer à travailler avec de grands clusters réels si vous avez intériorisé ces concepts au préalable!

Resilient Distributed Dataset (RDD) et SparkContext

De nombreux programmes Spark tournent autour du concept d’un resilient distributed dataset (RDD), qui est une collection d’éléments tolérants aux pannes pouvant être exploités en parallèle. SparkContext réside dans le programme Pilote et gère les données distribuées sur les nœuds de travail via le gestionnaire de cluster. La bonne chose à propos de l’utilisation de PySpark est que toute cette complexité du partitionnement des données et de la gestion des tâches est gérée automatiquement à l’arrière et le programmeur peut se concentrer sur le travail d’analyse spécifique ou d’apprentissage automatique lui-même.

Les-Avantages-Exemples-d'utilisation-d'Apache-Spark-avec-PySpark-en-Python-Rdd-1.png

rdd–1

Il existe deux façons de créer des RDD : paralléliser une collection existante dans votre programme de pilote ou référencer un ensemble de données dans un système de stockage externe, tel qu’un système de fichiers partagé, HDFS, HBase ou toute source de données offrant un format d’entrée Hadoop.

Pour illustrer avec une approche basée sur Python, nous donnerons des exemples du premier type ici. Nous pouvons créer un tableau Python simple de 20 entiers aléatoires (entre 0 et 10), en utilisant Numpy random.randint(), puis créez un objet RDD comme suit,

from pyspark import SparkContextimport numpy as npsc=SparkContext(master="local")lst=np.random.randint(0,10,20)A=sc.parallelize(lst)

Notez le ‘4’ dans l’argument. Il désigne 4 cœurs de calcul (dans votre machine locale) à utiliser pour cet objet SparkContext. Si nous vérifions le type de l’objet RDD, nous obtenons ce qui suit,

type(A)>> pyspark.rdd.RDD

En face de la parallélisation est la collection (avec collect()) qui apporte tous les éléments distribués et les renvoie au nœud principal.

A.collect()>> 

Mais A n’est plus un simple tableau Numpy. Nous pouvons utiliser la méthode glom() pour vérifier comment les partitions sont créées.

A.glom().collect()>> , , , ]

Maintenant, arrêtez le SC et réinitialisez-le avec 2 cœurs et voyez ce qui se passe lorsque vous répétez le processus.

sc.stop()sc=SparkContext(master="local")A = sc.parallelize(lst)A.glom().collect()>> , ]

Le RDD est maintenant réparti sur deux morceaux, pas quatre!

Vous avez appris la première étape de l’analyse de données distribuées, c’est-à-dire contrôler la façon dont vos données sont partitionnées sur de plus petits morceaux pour un traitement ultérieur

Quelques exemples d’opérations de base avec RDD &PySpark

>> 20

Le premier élément (premier) et les premiers éléments (prendre)

A.first()>> 4A.take(3)>> 

Suppression des doublons en utilisant distinct

REMARQUE: Cette opération nécessite une lecture aléatoire afin de détecter la duplication entre les partitions. C’est donc une opération lente. N’en faites pas trop.

A_distinct=A.distinct()A_distinct.collect()>> 

Pour additionner, tous les éléments utilisent la méthode reduce

Notez l’utilisation d’une fonction lambda dans celle-ci,

A.reduce(lambda x,y:x+y)>> 80

Ou la méthode direct sum()

A.sum()>> 80

Trouver un élément maximum en réduisant

A.reduce(lambda x,y: x if x > y else y)>> 9

Trouver le mot le plus long dans un blob de texte

words = 'These are some of the best Macintosh computers ever'.split(' ')wordRDD = sc.parallelize(words)wordRDD.reduce(lambda w,v: w if len(w)>len(v) else v)>> 'computers'

Utilisez le filtre pour un filtrage basé sur la logique

# Return RDD with elements (greater than zero) divisible by 3A.filter(lambda x:x%3==0 and x!=0).collect()>> 

Écrivez des fonctions Python régulières à utiliser avec reduce ()

def largerThan(x,y): """ Returns the last word among the longest words in a list """ if len(x)> len(y): return x elif len(y) > len(x): return y else: if x < y: return x else: return ywordRDD.reduce(largerThan)>> 'Macintosh'

Notez ici le x < y effectue une comparaison lexicographique et détermine que Macintosh est plus grand que les ordinateurs!

Opération de mappage avec une fonction lambda avec PySpark

B=A.map(lambda x:x*x)B.collect()>> 

Mappage avec une fonction Python régulière dans PySpark

def square_if_odd(x): """ Squares if odd, otherwise keeps the argument unchanged """ if x%2==1: return x*x else: return xA.map(square_if_odd).collect()>> 

groupby renvoie un RDD d’éléments groupés (itérables) selon une opération de groupe donnée

Dans l’exemple suivant, nous utilisons une compréhension de liste avec le groupe pour créer une liste de deux éléments, chacun ayant un en-tête (le résultat de la fonction lambda, simple modulo 2 ici), et une liste triée des éléments qui ont donné lieu à ce résultat. Vous pouvez facilement imaginer que ce type de séparation peut être particulièrement utile pour le traitement de données qui doivent être regroupées / mises en conserve en fonction d’une opération particulière effectuée sur elles.

result=A.groupBy(lambda x:x%2).collect()sorted()>> ), (1, )]

En utilisant histogram

La méthode histogram() prend une liste de bins/buckets et renvoie un tuple avec le résultat de l’histogramme (binning),

B.histogram()>> (, )

Set operations

Vous pouvez également effectuer des opérations de set régulières sur des RDD comme -union ( ), intersection(), soustraction() ou cartésienne().

Consultez ce cahier Jupyter pour plus d’exemples.

Téléchargement d'ebook d'apprentissage profond

Évaluation paresseuse avec PySpark (et mise en cache)

L’évaluation paresseuse est une stratégie d’évaluation / calcul qui prépare une carte interne détaillée étape par étape du pipeline d’exécution pour une tâche de calcul mais retarde l’exécution finale jusqu’au moment où elle est absolument nécessaire. Cette stratégie est au cœur de Spark pour accélérer de nombreuses opérations Big Data parallélisées.

Utilisons deux cœurs de processeur pour cet exemple,

sc = SparkContext(master="local")

Créez un RDD avec 1 million d’éléments

%%timerdd1 = sc.parallelize(range(1000000))>> CPU times: user 316 µs, sys: 5.13 ms, total: 5.45 ms, Wall time: 24.6 ms

Une fonction de calcul –taketime

from math import cosdef taketime(x): return cos(x)

Vérifiez combien de temps est nécessaire pour prise par la fonction taketime

%%timetaketime(2)>> CPU times: user 21 µs, sys: 7 µs, total: 28 µs, Wall time: 31.5 µs>> -0.4161468365471424

Rappelez-vous ce résultat, la fonction taketime() a pris un temps de mur de 31,5 us. Bien sûr, le nombre exact dépendra de la machine sur laquelle vous travaillez.

Maintenant, faites l’opération de carte sur la fonction

%%timeinterim = rdd1.map(lambda x: taketime(x))>> CPU times: user 23 µs, sys: 8 µs, total: 31 µs, Wall time: 34.8 µs

Comment se fait-il que chaque fonction taketime prenne 45.8 mais l’opération de la carte avec un RDD de 1 million d’éléments a également pris un temps similaire?

En raison de l’évaluation paresseuse, c’est-à-dire que rien n’a été calculé à l’étape précédente, juste un plan d’exécution a été fait. La variable interim ne pointe pas vers une structure de données, elle pointe plutôt vers un plan d’exécution, exprimé sous forme de graphique de dépendance. Le graphique des dépendances définit comment les RDD sont calculés les uns des autres.

L’exécution réelle par la méthode de réduction

%%timeprint('output =',interim.reduce(lambda x,y:x+y))>> output = -0.28870546796843666>> CPU times: user 11.6 ms, sys: 5.56 ms, total: 17.2 ms, Wall time: 15.6 s

Ainsi, le temps de paroi ici est de 15,6 secondes. Rappelez-vous, la fonction taketime() avait un temps de mur de 31,5 us? Par conséquent, nous nous attendons à ce que le temps total soit de l’ordre de ~ 31 secondes pour un tableau de 1 million. En raison du fonctionnement parallèle sur deux cœurs, cela a pris ~ 15 secondes.

Maintenant, nous n’avons enregistré (matérialisé) aucun résultat intermédiaire dans l’intervalle, donc une autre opération simple (par exemple, compter des éléments > 0) prendra presque le même temps.

%%timeprint(interim.filter(lambda x:x>0).count())>> 500000>> CPU times: user 10.6 ms, sys: 8.55 ms, total: 19.2 ms, Wall time: 12.1 s

Mise en cache pour réduire le temps de calcul sur une opération similaire (mémoire dépensée)

Rappelez-vous le graphique de dépendance que nous avons construit à l’étape précédente? Nous pouvons exécuter le même calcul qu’auparavant avec la méthode cache pour indiquer au graphique de dépendance de planifier la mise en cache.

%%timeinterim = rdd1.map(lambda x: taketime(x)).cache()

Le premier calcul ne s’améliorera pas, mais il met en cache le résultat intermédiaire,

%%timeprint('output =',interim.reduce(lambda x,y:x+y))>> output = -0.28870546796843666>> CPU times: user 16.4 ms, sys: 2.24 ms, total: 18.7 ms, Wall time: 15.3 s

Exécutez maintenant la même méthode de filtrage à l’aide du résultat mis en cache,

%%timeprint(interim.filter(lambda x:x>0).count())>> 500000>> CPU times: user 14.2 ms, sys: 3.27 ms, total: 17.4 ms, Wall time: 811 ms

Wow! Le temps de calcul est descendu à moins d’une seconde contre 12 secondes auparavant! De cette façon, la mise en cache et la parallélisation avec exécution paresseuse sont la caractéristique principale de la programmation avec Spark.

Dataframe et SparkSQL

En dehors du RDD, la deuxième structure de données clé dans le framework Spark est la trame de données. Si vous avez travaillé avec Python Pandas ou R DataFrame, le concept peut sembler familier.

Une trame de données est une collection distribuée de lignes sous des colonnes nommées. Il est conceptuellement équivalent à une table dans une base de données relationnelle, une feuille Excel avec des en-têtes de colonne ou un bloc de données en R / Python, mais avec des optimisations plus riches sous le capot. Les cadres de données peuvent être construits à partir d’un large éventail de sources telles que des fichiers de données structurés, des tables dans Hive, des bases de données externes ou des RDD existants. Il partage également certaines caractéristiques communes avec RDD:

  • De nature immuable: Nous pouvons créer une seule trame de données/RDD mais nous ne pouvons pas la modifier. Et nous pouvons transformer une trame de données / RDD après avoir appliqué des transformations.
  • Évaluations paresseuses : Cela signifie qu’une tâche n’est pas exécutée tant qu’une action n’est pas effectuée. Distribué: RDD et DataFrame sont tous deux distribués dans la nature.

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-DataFrame-in-PySpark.png

Avantages de la trame de données

  • Les trames de données sont conçues pour traiter une grande collection de données structurées ou semi-structurées.
  • Les observations dans la trame de données Spark sont organisées sous des colonnes nommées, ce qui aide Apache Spark à comprendre le schéma d’une trame de données. Cela aide Spark à optimiser le plan d’exécution de ces requêtes.
  • DataFrame dans Apache Spark a la capacité de gérer des pétaoctets de données.
  • DataFrame prend en charge un large éventail de formats et de sources de données.
  • Il a un support API pour différents langages comme Python, R, Scala, Java.

Exemple de bases de la trame de données

Pour les principes fondamentaux et les exemples d’utilisation typiques des trames de données, veuillez consulter les blocs-notes Jupyter suivants,

Bases de la trame de données Spark

Opérations de trame de données Spark

SparkSQL Aide à combler l’écart pour PySpark

Les magasins de données relationnelles sont faciles à créer et à interroger. Les utilisateurs et les développeurs préfèrent souvent écrire des requêtes déclaratives faciles à interpréter dans un langage lisible de type humain tel que SQL. Cependant, à mesure que les données commencent à augmenter en volume et en variété, l’approche relationnelle ne s’adapte pas assez bien pour créer des applications Big Data et des systèmes analytiques.

Nous avons eu du succès dans le domaine de l’analyse de Big Data avec Hadoop et le paradigme MapReduce. C’était puissant, mais souvent lent, et donnait aux utilisateurs une interface de programmation procédurale de bas niveau qui obligeait les gens à écrire beaucoup de code, même pour des transformations de données très simples. Cependant, une fois que Spark a été publié, il a vraiment révolutionné la façon dont l’analyse des Big Data était effectuée en mettant l’accent sur le calcul en mémoire, la tolérance aux pannes, les abstractions de haut niveau et la facilité d’utilisation.

Spark SQL essaie essentiellement de combler le fossé entre les deux modèles que nous avons mentionnés précédemment — les modèles relationnels et procéduraux. Spark SQL fonctionne via l’API DataFrame qui peut effectuer des opérations relationnelles à la fois sur des sources de données externes et sur les collections distribuées intégrées de Spark – à grande échelle !

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Spark-SQL-1024x489.png

Pourquoi Spark SQL est-il si rapide et optimisé ? La raison en est un nouvel optimiseur extensible, Catalyst, basé sur des constructions de programmation fonctionnelle dans Scala. Catalyst prend en charge l’optimisation basée sur des règles et basée sur les coûts. Bien que des optimiseurs extensibles aient été proposés dans le passé, ils ont généralement nécessité un langage spécifique à un domaine complexe pour spécifier des règles. Habituellement, cela entraîne une courbe d’apprentissage et une charge de maintenance importantes. En revanche, Catalyst utilise des fonctionnalités standard du langage de programmation Scala, telles que le pattern-matching, pour permettre aux développeurs d’utiliser le langage de programmation complet tout en rendant les règles faciles à spécifier.

Vous pouvez vous référer au bloc-notes Jupyter suivant pour une introduction aux opérations de base de données avec SparkSQL :

Bases des opérations de base de données SparkSQL

Comment Allez-Vous Utiliser PySpark dans Votre Projet ?

Nous avons couvert les principes fondamentaux de l’écosystème Apache Spark et son fonctionnement ainsi que quelques exemples d’utilisation de base de la structure de données de base RDD avec l’interface Python PySpark. En outre, DataFrame et SparkSQL ont été discutés avec des liens de référence, par exemple des blocs-notes de code.

Il y a tellement plus à apprendre et à expérimenter avec Apache Spark utilisé avec Python. Le site Web de PySpark est une bonne référence à avoir sur votre radar, et ils font des mises à jour et des améliorations régulières – alors gardez un œil sur cela.

Et, si vous souhaitez faire de l’apprentissage automatique distribué à grande échelle avec Apache Spark, consultez la partie MLLib de l’écosystème PySpark.

Avez-vous Apprécié Ce Blog De PySpark ? Assurez-vous de vérifier:

  • Privacy-Preserving Deep Learning – PySyft Versus TF-Encrypted

Accelerate-your-AI.-1024x202.jpg

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *