Exxact / Deep Learning, HPC, AV, Distribution & mai mult

ce este Apache Spark?

Apache Spark este una dintre cele mai noi tendințe în domeniul tehnologiei. Este cadrul cu probabil cel mai mare potențial de a realiza rodul căsătoriei dintre Big Data și Machine Learning.

rulează rapid (până la 100x mai rapid decât Hadoop MapReduce tradițional datorită funcționării în memorie, oferă obiecte de date robuste, distribuite, tolerante la erori (numite RDD) și se integrează frumos cu lumea învățării automate și a analizelor grafice prin pachete suplimentare precum Mlib și GraphX.

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Apache-Spark-288x300.png

Spark este implementat pe Hadoop / HDFS și scris mai ales în Scala, un limbaj de programare funcțional, similar cu Java. De fapt, Scala are nevoie de cea mai recentă instalare Java pe sistemul dvs. și rulează pe JVM. Cu toate acestea, pentru majoritatea începătorilor, Scala nu este o limbă pe care o învață mai întâi să se aventureze în lumea științei datelor. Din fericire, Spark oferă o integrare Python minunată, numită PySpark, care permite programatorilor Python să interacționeze cu cadrul Spark și să învețe cum să manipuleze datele la scară și să lucreze cu obiecte și algoritmi pe un sistem de fișiere distribuit.

în acest articol, vom învăța elementele de bază ale PySpark. Există o mulțime de concepte (în continuă evoluție și introduse) și, prin urmare, ne concentrăm doar pe fundamente cu câteva exemple simple. Cititorii sunt încurajați să se bazeze pe acestea și să exploreze mai mult pe cont propriu.

Scurta istorie a Apache Spark

Apache Spark a început ca un proiect de cercetare la UC Berkeley AMPLab în 2009 și a fost deschis la începutul anului 2010. A fost un proiect de clasă la UC Berkeley. Ideea a fost de a construi un cadru de management cluster, care poate sprijini diferite tipuri de sisteme de calcul cluster. Multe dintre ideile din spatele sistemului au fost prezentate în diferite lucrări de cercetare de-a lungul anilor. După ce a fost lansat, Spark a devenit o comunitate largă de dezvoltatori și s-a mutat la Apache Software Foundation în 2013. Astăzi, proiectul este dezvoltat în colaborare de o comunitate de sute de dezvoltatori din sute de organizații.

Spark nu este un limbaj de programare

Un lucru de reținut este că Spark nu este un limbaj de programare precum Python sau Java. Este un motor de prelucrare a datelor distribuite de uz general, potrivit pentru utilizare într-o gamă largă de circumstanțe. Este deosebit de util pentru prelucrarea datelor mari atât la scară, cât și la viteză mare.

dezvoltatorii de aplicații și oamenii de știință de date încorporează în general Spark în aplicațiile lor pentru a interoga, analiza și transforma rapid datele la scară. Unele dintre sarcinile care sunt cel mai frecvent asociate cu Spark includ, – ETL și SQL lot de locuri de muncă în seturi mari de date (de multe ori de terabytes de dimensiune), – prelucrarea datelor de streaming de la dispozitive și noduri IoT, date de la diverși senzori, sisteme financiare și tranzacționale de toate tipurile, și – sarcini de învățare mașină pentru e-commerce sau aplicații IT.

în centrul său, Spark se bazează pe partea de sus a cadrului Hadoop / HDFS pentru manipularea fișierelor distribuite. Este implementat în cea mai mare parte cu Scala, o variantă funcțională a limbajului Java. Există un motor de procesare a datelor de bază Spark, dar, în plus, există multe biblioteci dezvoltate pentru analiza interogărilor de tip SQL, învățarea automată distribuită, calculul graficelor la scară largă și prelucrarea datelor în flux. Mai multe limbaje de programare sunt acceptate de Spark sub formă de biblioteci de interfață ușoară: Java, Python, Scala și R.

Spark folosește paradigma MapReduce pentru procesarea distribuită

ideea de bază a procesării distribuite este de a împărți bucățile de date în bucăți mici gestionabile (inclusiv unele filtrări și sortări), de a aduce calculul aproape de date, adică folosind noduri mici ale unui cluster mare pentru anumite lucrări și apoi re-combinați-le înapoi. Porțiunea de divizare se numește acțiunea ‘Map’, iar recombinarea se numește acțiunea’ Reduce’. Împreună, ei fac faimoasa paradigmă ‘MapReduce’, care a fost introdusă de Google în jurul anului 2004 (a se vedea lucrarea originală aici).

de exemplu, dacă un fișier are 100 de înregistrări de procesat, 100 de cartografi pot rula împreună pentru a procesa câte o înregistrare. Sau poate 50 mappers pot rula împreună pentru a procesa două înregistrări fiecare. După ce toate mappers prelucrare completă, cadrul shuffles și sortează rezultatele înainte de a trece-le pe reductoare. Un reductor nu poate porni în timp ce un mapper este încă în desfășurare. Toate valorile de ieșire ale hărții care au aceeași cheie sunt atribuite unui singur reductor, care apoi agregă valorile pentru acea cheie.

PE_TITAN-RTX-Blog-1024x127.jpg

cum se configurează PySpark

Dacă sunteți deja familiarizați cu Python și biblioteci, cum ar fi Panda și Numpy, atunci PySpark este o extensie / cadru excelent de învățat pentru a crea analize și conducte mai scalabile, intensive de date, utilizând puterea Spark în fundal.

procesul exact de instalare și configurare a mediului PySpark (pe o mașină independentă) este oarecum implicat și poate varia ușor în funcție de sistemul și mediul dvs. Scopul este de a obține mediul dvs. obișnuit de știință a datelor Jupyter care lucrează cu Spark în fundal folosind pachetul PySpark.

Acest articol pe mediu oferă mai multe detalii despre procesul de configurare pas cu pas.

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Apache-Spark-Setup.png

alternativ, puteți utiliza Databricks setup pentru practicarea Spark. Această companie a fost creată de creatorii originali ai Spark și are un mediu excelent gata de lansare pentru a face analize distribuite cu Spark.

dar ideea este întotdeauna aceeași. Distribuiți (și replicați) setul de date mare în bucăți mici fixe pe mai multe noduri. Apoi aduceți motorul de calcul aproape de ele, astfel încât întreaga operațiune să fie paralelizată, tolerantă la erori și scalabilă.

lucrând cu pyspark și Jupyter notebook, puteți învăța toate aceste concepte fără a cheltui nimic pe AWS sau platforma Databricks. De asemenea, puteți interfața cu ușurință cu SparkSQL și MLlib pentru manipularea bazelor de date și învățarea automată. Va fi mult mai ușor să începeți să lucrați cu clustere mari din viața reală dacă ați interiorizat aceste concepte în prealabil!

resilient Distributed Dataset (RDD) și SparkContext

multe programe Spark se învârt în jurul conceptului de resilient distributed dataset (RDD), care este o colecție tolerantă la erori de elemente care pot fi operate în paralel. SparkContext se află în programul Driver și gestionează datele distribuite pe nodurile lucrătorului prin managerul de cluster. Lucrul bun despre utilizarea PySpark este că toată această complexitate a partiționării datelor și a gestionării sarcinilor este gestionată automat în spate, iar programatorul se poate concentra pe analiza specifică sau pe lucrarea de învățare automată în sine.

beneficiile-Exemple-de-Utilizare-Apache-Spark-cu-PySpark-in-Python-Rdd-1.png

rdd-1

există două moduri de a crea RDDs–paralelizarea unei colecții existente în programul driver sau referențierea unui set de date într – un sistem de stocare extern, cum ar fi un sistem de fișiere partajat, HDFS, HBase sau orice sursă de date care oferă un Hadoop InputFormat.

pentru ilustrare cu o abordare bazată pe Python, vom da exemple de primul tip aici. Putem crea o matrice Python simplu de 20 numere întregi aleatoare (între 0 și 10), folosind Numpy aleatoare.randint (), apoi creați un obiect RDD după cum urmează,

from pyspark import SparkContextimport numpy as npsc=SparkContext(master="local")lst=np.random.randint(0,10,20)A=sc.parallelize(lst)

notați ‘4’ în argument. Aceasta denotă 4 nuclee de calcul (în mașina locală) pentru a fi utilizate pentru acest obiect SparkContext. Dacă verificăm tipul obiectului RDD, obținem următoarele:

type(A)>> pyspark.rdd.RDD

opus paralelizării este colecția (cu collect()) care aduce toate elementele distribuite și le returnează la nodul capului.

A.collect()>> 

dar A nu mai este o matrice Numpy simplă. Putem folosi metoda glom () pentru a verifica modul în care sunt create partițiile.

A.glom().collect()>> , , , ]

acum opriți SC și reinițializați-l cu 2 nuclee și vedeți ce se întâmplă când repetați procesul.

sc.stop()sc=SparkContext(master="local")A = sc.parallelize(lst)A.glom().collect()>> , ]

RDD este acum distribuit pe două bucăți, nu patru!

ați învățat despre primul pas în analiza datelor distribuite, adică. controlul modului în care datele dvs. sunt împărțite pe bucăți mai mici pentru prelucrare ulterioară

câteva exemple de operații de bază cu RDD & PySpark

>> 20

primul element (primul) și primele câteva elemente (ia)

A.first()>> 4A.take(3)>> 

eliminarea duplicatelor cu utilizarea distinctă

Notă: Această operație necesită o amestecare pentru a detecta duplicarea între partiții. Deci, este o operație lentă. Nu exagera.

A_distinct=A.distinct()A_distinct.collect()>> 

pentru a rezuma, toate elementele folosesc metoda reduce

Notă Utilizarea unei funcții lambda în acest,

A.reduce(lambda x,y:x+y)>> 80

sau suma directă() metoda

A.sum()>> 80

găsirea elementului maxim prin reducerea

A.reduce(lambda x,y: x if x > y else y)>> 9

găsirea celui mai lung cuvânt dintr-o pată de text

words = 'These are some of the best Macintosh computers ever'.split(' ')wordRDD = sc.parallelize(words)wordRDD.reduce(lambda w,v: w if len(w)>len(v) else v)>> 'computers'

utilizați filtrul pentru filtrarea bazată pe logică

# Return RDD with elements (greater than zero) divisible by 3A.filter(lambda x:x%3==0 and x!=0).collect()>> 

scrieți funcții Python obișnuite pentru a utiliza cu reducerea()

def largerThan(x,y): """ Returns the last word among the longest words in a list """ if len(x)> len(y): return x elif len(y) > len(x): return y else: if x < y: return x else: return ywordRDD.reduce(largerThan)>> 'Macintosh'

notați aici X < y face o comparație lexicografică și determină că Macintosh este mai mare decât computerele!

operație de mapare cu o funcție lambda cu PySpark

B=A.map(lambda x:x*x)B.collect()>> 

mapare cu o funcție Python obișnuită în PySpark

def square_if_odd(x): """ Squares if odd, otherwise keeps the argument unchanged """ if x%2==1: return x*x else: return xA.map(square_if_odd).collect()>> 

groupby returnează un RDD de elemente grupate (iterabile) conform unei operații de grup date

în exemplul următor, folosim o listă de înțelegere împreună cu grupul pentru a crea o listă de două elemente, fiecare având un antet (rezultatul funcției Lambda, modulo simplu 2 aici) și o listă sortată a elementelor care au dat naștere acestui rezultat. Vă puteți imagina cu ușurință că acest tip de separare poate veni deosebit de util pentru prelucrarea datelor care trebuie să fie binned/conserve pe baza unei anumite operațiuni efectuate asupra lor.

result=A.groupBy(lambda x:x%2).collect()sorted()>> ), (1, )]

folosind histograma

metoda histograma() ia o listă de containere/găleți și returnează un tuplu cu rezultatul histogramei (binning),

B.histogram()>> (, )

operații Set

puteți face, de asemenea, operațiunile set regulat pe RDDs ca – union (), intersecție (), scădere () sau cartezian().

Check out acest notebook Jupyter pentru mai multe exemple.

Deep Learning Ebook Download

evaluare Lazy cu PySpark (și Cache)

evaluarea Lazy este o strategie de evaluare/calcul care pregătește o hartă internă detaliată pas cu pas a conductei de execuție pentru o sarcină de calcul, dar întârzie execuția finală până când este absolut necesară. Această strategie se află în centrul Spark pentru accelerarea multor operațiuni paralelizate de date mari.

să folosim două nuclee CPU pentru acest exemplu,

sc = SparkContext(master="local")

faceți un RDD cu 1 milion de elemente

%%timerdd1 = sc.parallelize(range(1000000))>> CPU times: user 316 µs, sys: 5.13 ms, total: 5.45 ms, Wall time: 24.6 ms

unele funcții de calcul – taketime

from math import cosdef taketime(x): return cos(x)

verificați cât timp luate de funcția taketime

%%timetaketime(2)>> CPU times: user 21 µs, sys: 7 µs, total: 28 µs, Wall time: 31.5 µs>> -0.4161468365471424

amintiți-vă acest rezultat, funcția taketime() a avut un timp de perete de 31,5 us. Desigur, numărul exact va depinde de mașina la care lucrați.

acum face operația hartă pe funcția

%%timeinterim = rdd1.map(lambda x: taketime(x))>> CPU times: user 23 µs, sys: 8 µs, total: 31 µs, Wall time: 34.8 µs

cum se face fiecare funcție taketime ia 45.8 SUA, dar operațiunea hartă cu un 1 milioane de elemente RDD, de asemenea, a luat un timp similar?

Din cauza evaluării leneșe, adică nu s-a calculat nimic în pasul anterior, s-a făcut doar un plan de execuție. Variabila intermediară nu indică o structură de date, în schimb, indică un plan de execuție, exprimat ca un grafic de dependență. Graficul de dependență definește modul în care RDD-urile sunt calculate unele de altele.

execuția efectivă prin metoda de reducere

%%timeprint('output =',interim.reduce(lambda x,y:x+y))>> output = -0.28870546796843666>> CPU times: user 11.6 ms, sys: 5.56 ms, total: 17.2 ms, Wall time: 15.6 s

deci, timpul de perete aici este de 15,6 secunde. Amintiți-vă, funcția taketime() a avut un timp de perete de 31.5 SUA? Prin urmare, ne așteptăm ca timpul total să fie de ordinul a ~ 31 de secunde pentru o matrice de 1 milion. Din cauza funcționării paralele pe două nuclee, a durat ~ 15 secunde.

acum, nu am salvat (materializat) niciun rezultat intermediar între timp, astfel încât o altă operație simplă (de exemplu, numărarea elementelor> 0) va dura aproape în același timp.

%%timeprint(interim.filter(lambda x:x>0).count())>> 500000>> CPU times: user 10.6 ms, sys: 8.55 ms, total: 19.2 ms, Wall time: 12.1 s

Caching pentru a reduce timpul de calcul pe operație similară (memorie de cheltuieli)

vă amintiți graficul de dependență pe care l-am construit în pasul anterior? Putem rula același calcul ca înainte cu metoda cache pentru a spune graficului de dependență să planifice pentru cache.

%%timeinterim = rdd1.map(lambda x: taketime(x)).cache()

primul calcul nu se va îmbunătăți, dar cache rezultatul interimar,

%%timeprint('output =',interim.reduce(lambda x,y:x+y))>> output = -0.28870546796843666>> CPU times: user 16.4 ms, sys: 2.24 ms, total: 18.7 ms, Wall time: 15.3 s

acum rula aceeași metodă de filtrare cu ajutorul rezultat cache,

%%timeprint(interim.filter(lambda x:x>0).count())>> 500000>> CPU times: user 14.2 ms, sys: 3.27 ms, total: 17.4 ms, Wall time: 811 ms

Wow! Timpul de calcul a scăzut la mai puțin de o secundă de la 12 secunde mai devreme! În acest fel, cache-ul și paralelizarea cu execuția leneșă, este caracteristica de bază a programării cu Spark.

Dataframe și SparkSQL

În afară de RDD, a doua structură de date cheie din cadrul Spark este DataFrame. Dacă ați lucrat cu Panda Python sau R DataFrame, conceptul poate părea familiar.

un cadru de date este o colecție distribuită de rânduri sub coloane numite. Este echivalent conceptual cu un tabel dintr-o bază de date relațională, o foaie Excel cu anteturi de coloană sau un cadru de date în R/Python, dar cu optimizări mai bogate sub capotă. DataFrames pot fi construite dintr-o gamă largă de surse, cum ar fi fișiere de date structurate, tabele în stup, baze de date externe, sau RDDs existente. De asemenea, împărtășește unele caracteristici comune cu RDD:

  • imuabil în natură: putem crea DataFrame / RDD o dată, dar nu o putem schimba. Și putem transforma un cadru de date / RDD după aplicarea transformărilor.
  • evaluări leneșe: aceasta înseamnă că o sarcină nu este executată până când nu se efectuează o acțiune. Distribuit: RDD și DataFrame ambele sunt distribuite în natură.

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-DataFrame-in-PySpark.png

avantajele cadrului de date

  • cadrele de date sunt concepute pentru procesarea unei colecții mari de date structurate sau semi-structurate.
  • observațiile din Spark DataFrame sunt organizate sub coloane numite, ceea ce ajută Apache Spark să înțeleagă schema unui cadru de date. Acest lucru ajută Spark optimiza planul de execuție pe aceste interogări.
  • DataFrame în Apache Spark are capacitatea de a gestiona petabytes de date.
  • DataFrame are suport pentru o gamă largă de formate de date și surse.
  • are suport API pentru diferite limbi precum Python, R, Scala, Java.

dataframe bazele exemplu

pentru fundamentele și exemple tipice de utilizare a DataFrames, vă rugăm să consultați următoarele Notebook-uri Jupyter,

Spark dataframe bazele

Spark dataframe operațiuni

SparkSQL ajută la reducerea decalajului pentru pyspark

magazine de date relaționale sunt ușor de a construi și de interogare. Utilizatorii și dezvoltatorii preferă adesea să scrie interogări declarative ușor de interpretat într-un limbaj lizibil asemănător omului, cum ar fi SQL. Cu toate acestea, pe măsură ce datele încep să crească în volum și varietate, abordarea relațională nu se scalează suficient de bine pentru a construi aplicații de date mari și sisteme analitice.

am avut succes în domeniul Big Data analytics cu Hadoop și paradigma MapReduce. Acest lucru a fost puternic, dar adesea lent, și a oferit utilizatorilor o interfață de programare procedurală de nivel scăzut, care impunea oamenilor să scrie mult cod chiar și pentru transformări de date foarte simple. Cu toate acestea, odată ce Spark a fost lansat, a revoluționat cu adevărat modul în care s-a făcut analiza Big Data, cu accent pe calculul în memorie, toleranța la erori, abstracții la nivel înalt și ușurința de utilizare.

Spark SQL încearcă în esență să reducă decalajul dintre cele două modele pe care le—am menționat anterior-modelele relaționale și procedurale. Spark SQL funcționează prin API-ul DataFrame care poate efectua operațiuni relaționale atât pe surse de date externe, cât și pe colecțiile distribuite încorporate Spark-la scară!

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Spark-SQL-1024x489.png

De ce este Spark SQL atât de rapid și optimizat? Motivul se datorează unui nou optimizator extensibil, Catalyst, bazat pe constructe de programare funcțională în Scala. Catalyst acceptă atât optimizarea bazată pe reguli, cât și pe costuri. În timp ce optimizatoarele extensibile au fost propuse în trecut, acestea au necesitat de obicei un limbaj complex specific domeniului pentru a specifica regulile. De obicei, acest lucru duce la o curbă semnificativă de învățare și o povară de întreținere. În schimb, Catalyst folosește caracteristici standard ale limbajului de programare Scala, cum ar fi potrivirea modelelor, pentru a permite dezvoltatorilor să utilizeze limbajul de programare complet, făcând în același timp Regulile ușor de specificat.

puteți consulta următorul Notebook Jupyter pentru o introducere în operațiunile bazei de date cu SparkSQL:

bazele operațiunilor bazei de date SparkSQL

cum veți utiliza PySpark în proiectul dvs.?

am acoperit fundamentele ecosistemului Apache Spark și modul în care funcționează împreună cu câteva exemple de utilizare de bază a structurii de date de bază RDD cu interfața Python PySpark. De asemenea, DataFrame și SparkSQL au fost discutate împreună cu link-uri de referință, de exemplu, notebook-uri de cod.

există atât de mult mai mult pentru a învăța și de a experimenta cu Apache Spark fiind folosit cu Python. Site–ul PySpark este o referință bună pentru a avea pe radar, și ei fac actualizări regulate și îmbunătățiri-așa că ține un ochi pe care.

și, dacă sunteți interesat să faceți învățare automată pe scară largă, distribuită cu Apache Spark, atunci verificați porțiunea MLLib a ecosistemului PySpark.

ți-a plăcut acest Blog PySpark? Asigurați – vă că pentru a verifica afară:

  • Privacy-Preserving Deep Learning – PySyft Versus TF-Encrypted

Accelerate-your-AI.-1024x202.jpg

Lasă un răspuns

Adresa ta de email nu va fi publicată. Câmpurile obligatorii sunt marcate cu *