Dyb læring, HPC, AV, Distribution & mere

Hvad er Apache Spark?

Apache Spark er en af de hotteste nye tendenser inden for teknologidomænet. Det er rammen med sandsynligvis det højeste potentiale til at realisere frugten af ægteskabet mellem Big Data og maskinindlæring.

det kører hurtigt (op til 100 gange hurtigere end traditionel Hadoop MapReduce på grund af in-memory-drift, tilbyder robuste, distribuerede, fejltolerante dataobjekter (kaldet RDD) og integreres smukt med en verden af maskinlæring og grafanalyse gennem supplerende pakker som Mlib og Graf.

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Apache-Spark-288x300.PNG

Spark er implementeret på Hadoop / HDFS og skrevet mest i Scala, et funktionelt programmeringssprog, der ligner Java. Faktisk har Scala brug for den nyeste Java-installation på dit system og kører på JVM. For de fleste begyndere er Scala imidlertid ikke et sprog, som de først lærer at vove sig ind i datalogiens verden. Heldigvis giver Spark en vidunderlig Python-integration, kaldet PySpark, som lader Python-programmører interface med Spark-rammen og lære at manipulere data i skala og arbejde med objekter og algoritmer over et distribueret filsystem.

i denne artikel lærer vi det grundlæggende i PySpark. Der er mange koncepter (konstant udvikling og introduceret), og derfor fokuserer vi bare på fundamentale med et par enkle eksempler. Læserne opfordres til at bygge videre på disse og udforske mere på egen hånd.

Apache Sparks korte historie

Apache Spark startede som et forskningsprojekt ved UC Berkeley AMPLab i 2009 og var åben i begyndelsen af 2010. Det var et klasseprojekt på UC Berkeley. Ideen var at opbygge en klyngestyringsramme, som kan understøtte forskellige former for klyngecomputersystemer. Mange af ideerne bag systemet blev præsenteret i forskellige forskningsartikler gennem årene. Efter at være blevet frigivet voksede Spark til et bredt udviklerfællesskab og flyttede til Apache Foundation i 2013. I dag er projektet udviklet i samarbejde med et fællesskab af hundredvis af udviklere fra hundredvis af organisationer.

Spark er ikke et programmeringssprog

en ting at huske er, at Spark ikke er et programmeringssprog som Python eller Java. Det er en generel distribueret databehandlingsmotor, der er egnet til brug under en lang række omstændigheder. Det er især nyttigt til stor databehandling både i skala og ved høj hastighed.

applikationsudviklere og dataforskere indarbejder generelt Spark i deres applikationer for hurtigt at forespørge, analysere og transformere data i skala. Nogle af de opgaver, der oftest er forbundet med Spark, inkluderer – ETL – og TRANSAKTIONSOPGAVER på tværs af store datasæt (ofte af terabyte af størrelse), – behandling af streamingdata fra IoT-enheder og noder, data fra forskellige sensorer, økonomiske og transaktionssystemer af alle slags og-maskinindlæringsopgaver til e-handel eller IT-applikationer.i sin kerne bygger Spark oven på Hadoop/HDFS-rammen til håndtering af distribuerede filer. Det er for det meste implementeret med Scala, en funktionel sprogvariant af Java. Der er en core Spark-databehandlingsmotor, men derudover er der mange biblioteker udviklet til FORESPØRGSELSANALYSE, distribueret maskinindlæring, storskala grafberegning og streaming af databehandling. Flere programmeringssprog understøttes af Spark i form af easy interface biblioteker: Java, Python, Scala og R.

Spark bruger MapReduce-paradigmet til distribueret behandling

grundideen med distribueret behandling er at opdele databunkerne i små håndterbare stykker (inklusive noget filtrering og sortering), bringe beregningen tæt på dataene, dvs.ved hjælp af små noder i en stor klynge til specifikke job og derefter kombinere dem igen. Den delende del kaldes ‘kort’ handling og rekombination kaldes ‘reducere’ handling. Sammen laver de det berømte ‘MapReduce’ paradigme, som blev introduceret af Google omkring 2004 (se det originale papir her).

hvis en fil f.eks. har 100 poster, der skal behandles, kan 100 kortlæggere køre sammen for at behandle en post hver. Eller måske 50 kortlæggere kan køre sammen for at behandle to poster hver. Efter at alle kortlæggere har afsluttet behandlingen, blandes rammen og sorterer resultaterne, inden de overføres til reduktionsanordningerne. En reducer kan ikke starte, mens en mapper stadig er i gang. Alle kortudgangsværdier, der har den samme nøgle, tildeles en enkelt reducer, som derefter aggregerer værdierne for den nøgle.

PE_TITAN-Blog-1024h127.jpg

Sådan opsættes PySpark

Hvis du allerede er bekendt med Python og biblioteker som Pandas og Numpy, så er PySpark en fantastisk udvidelse/ramme at lære for at skabe mere skalerbare, dataintensive analyser og rørledninger ved at udnytte gnistens kraft i baggrunden.

den nøjagtige proces med installation og opsætning af PySpark-miljø (på en enkeltstående maskine) er noget involveret og kan variere lidt afhængigt af dit system og miljø. Målet er at få dit almindelige Jupyter data science-miljø til at arbejde med Spark i baggrunden ved hjælp af PySpark-pakken.

denne artikel om Medium indeholder flere detaljer om trin-for-trin installationsprocessen.

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Apache-Spark-Setup.png

Alternativt kan du bruge Databricks setup til at øve Spark. Dette firma blev oprettet af de originale skabere af Spark og har et fremragende klar-til-lancering-miljø til at udføre distribueret analyse med Spark.

men ideen er altid den samme. Du distribuerer (og replikerer) dit store datasæt i små faste bidder over mange noder. Derefter bringer du beregningsmotoren tæt på dem, så hele operationen er paralleliseret, fejltolerant og skalerbar.

Ved at arbejde med PySpark og Jupyter notebook, kan du lære alle disse begreber uden at bruge noget på AV eller Databricks platform. Du kan også nemt interface med Mllib til databasemanipulation og maskinindlæring. Det vil være meget lettere at begynde at arbejde med virkelige store klynger, hvis du har internaliseret disse koncepter på forhånd!

elastisk distribueret datasæt (RDD) og Gnistkontekst

mange Gnistprogrammer drejer sig om begrebet et elastisk distribueret datasæt (RDD), som er en fejltolerant samling af elementer, der kan betjenes parallelt. Sparkkontekst findes i driverprogrammet og administrerer de distribuerede data over arbejdernoderne gennem cluster manager. Det gode ved at bruge PySpark er, at al denne kompleksitet af data partitionering og opgavestyring håndteres automatisk bagpå, og programmereren kan fokusere på selve det specifikke analyse-eller maskinlæringsjob.

fordelene-eksempler-af-brug-Apache-Spark-med-PySpark-in-Python-Rdd-1.png

rdd-1

Der er to måder at oprette RDDs på–parallelisering af en eksisterende samling i dit driverprogram eller henvisning til et datasæt i et eksternt lagringssystem, såsom et delt filsystem, HDFS, HBase eller enhver datakilde, der tilbyder en Hadoop InputFormat.

til illustration med en Python-baseret tilgang vil vi give eksempler på den første type her. Vi kan oprette et simpelt Python-array på 20 tilfældige heltal (mellem 0 og 10) ved hjælp af numpy random.randint (), og opret derefter et RDD-objekt som følger,

from pyspark import SparkContextimport numpy as npsc=SparkContext(master="local")lst=np.random.randint(0,10,20)A=sc.parallelize(lst)

Bemærk ‘4’ i argumentet. Det betegner 4 computerkerner (i din lokale maskine), der skal bruges til dette Gnistkontekstobjekt. Hvis vi kontrollerer typen af RDD-objektet, får vi følgende,

type(A)>> pyspark.rdd.RDD

modsat parallelisering er samlingen (med indsamle ()), som bringer alle de distribuerede elementer og returnerer dem til hovednoden.

A.collect()>> 

Men A er ikke længere et simpelt numpy array. Vi kan bruge glom () – metoden til at kontrollere, hvordan partitionerne oprettes.

A.glom().collect()>> , , , ]

stop nu SC og genindfør det med 2 kerner og se hvad der sker, når du gentager processen.

sc.stop()sc=SparkContext(master="local")A = sc.parallelize(lst)A.glom().collect()>> , ]

RDD er nu fordelt over to bidder, ikke fire!

du har lært om det første trin i distribueret dataanalyse, dvs. kontrol af, hvordan dine data er opdelt over mindre bidder til videre behandling

nogle eksempler på grundlæggende operationer med RDD & PySpark

>> 20

det første element (første) og de første par elementer (tage)

A.first()>> 4A.take(3)>> 

fjernelse af dubletter med brug af distinkt

Bemærk: Denne handling kræver en blanding for at registrere duplikering på tværs af partitioner. Så det er en langsom operation. Overdriv det ikke.

A_distinct=A.distinct()A_distinct.collect()>> 

for at opsummere bruger alle elementerne reduktionsmetoden

Bemærk brugen af en lambda-funktion i dette,

A.reduce(lambda x,y:x+y)>> 80

eller den direkte sum() metode

A.sum()>> 80

find maksimalt element ved at reducere

A.reduce(lambda x,y: x if x > y else y)>> 9

find det længste ord i en klat af tekst

words = 'These are some of the best Macintosh computers ever'.split(' ')wordRDD = sc.parallelize(words)wordRDD.reduce(lambda w,v: w if len(w)>len(v) else v)>> 'computers'

brug filteret til logikbaseret filtrering

# Return RDD with elements (greater than zero) divisible by 3A.filter(lambda x:x%3==0 and x!=0).collect()>> 

skriv regelmæssige Python-funktioner til at bruge med reducere()

def largerThan(x,y): """ Returns the last word among the longest words in a list """ if len(x)> len(y): return x elif len(y) > len(x): return y else: if x < y: return x else: return ywordRDD.reduce(largerThan)>> 'Macintosh'

Bemærk her < y gør en leksikografisk sammenligning og bestemmer, at Macintosh er større end computere!

Kortlægningsoperation med en lambda-funktion med PySpark

B=A.map(lambda x:x*x)B.collect()>> 

kortlægning med en almindelig Python-funktion i PySpark

def square_if_odd(x): """ Squares if odd, otherwise keeps the argument unchanged """ if x%2==1: return x*x else: return xA.map(square_if_odd).collect()>> 

groupby returnerer en RDD af grupperede elementer (iterable) i henhold til en given gruppeoperation

i det følgende eksempel bruger vi en listeforståelse sammen med gruppen for at oprette en liste over to elementer, der hver har en overskrift (resultatet af lambda-funktionen, enkel modulo 2 her) og en sorteret liste over de elementer, der gav anledning til dette resultat. Du kan nemt forestille dig, at denne form for adskillelse kan være særlig praktisk til behandling af data, der skal binned/dåse ud baseret på en bestemt operation udført over dem.

result=A.groupBy(lambda x:x%2).collect()sorted()>> ), (1, )]

brug af histogram

histogrammetoden() tager en liste over skraldespande/spande og returnerer en tuple med resultatet af histogrammet (binning),

B.histogram()>> (, )

Set operations

Du kan også gøre regelmæssige set operations på RDDs som – union(), kryds(), subtrahere () eller kartesisk().

tjek denne Jupyter notesbog for flere eksempler.

Deep Learning Ebook Hent

doven evaluering med PySpark (Og Caching)

doven evaluering er en evaluerings-/beregningsstrategi, der forbereder et detaljeret trin-for-trin Internt kort over eksekveringsrørledningen til en computeropgave, men som ikke er i stand til at forsinker den endelige udførelse, indtil det er absolut nødvendigt. Denne strategi er kernen i Spark for at fremskynde mange paralleliserede Big Data-operationer.

lad os bruge to CPU – kerner til dette eksempel,

sc = SparkContext(master="local")

lav en RDD med 1 million elementer

%%timerdd1 = sc.parallelize(range(1000000))>> CPU times: user 316 µs, sys: 5.13 ms, total: 5.45 ms, Wall time: 24.6 ms

some computing function-taketime

from math import cosdef taketime(x): return cos(x)

Kontroller, hvor meget tid er taget af taketime-funktion

%%timetaketime(2)>> CPU times: user 21 µs, sys: 7 µs, total: 28 µs, Wall time: 31.5 µs>> -0.4161468365471424

husk dette resultat, taketime () – funktionen tog en VÆGTID på 31,5 US. Selvfølgelig afhænger det nøjagtige antal af den maskine, du arbejder på.

gør nu kortoperationen på funktionen

%%timeinterim = rdd1.map(lambda x: taketime(x))>> CPU times: user 23 µs, sys: 8 µs, total: 31 µs, Wall time: 34.8 µs

hvordan kommer hver taketime-funktion 45.8 OS men kortoperationen med en 1 million elementer RDD tog også en lignende tid?

på grund af doven evaluering, dvs.intet blev beregnet i det foregående trin, blev der kun lavet en udførelsesplan. Variablen interim peger ikke på en datastruktur, men peger i stedet på en udførelsesplan, udtrykt som en Afhængighedsgraf. Afhængighedsgrafen definerer, hvordan RDD ‘ er beregnes ud fra hinanden.

den faktiske udførelse ved at reducere metode

%%timeprint('output =',interim.reduce(lambda x,y:x+y))>> output = -0.28870546796843666>> CPU times: user 11.6 ms, sys: 5.56 ms, total: 17.2 ms, Wall time: 15.6 s

så vægtiden her er 15,6 sekunder. Husk, at taketime () – funktionen havde en vægtid på 31,5 us? Derfor forventer vi, at den samlede tid er i størrelsesordenen ~ 31 sekunder for et 1-million array. På grund af parallel drift på to kerner tog det ~ 15 sekunder.

nu har vi ikke gemt (materialiseret) mellemliggende resultater i mellemtiden, så en anden simpel operation (f.eks. tæller elementer > 0) vil tage næsten samme tid.

%%timeprint(interim.filter(lambda x:x>0).count())>> 500000>> CPU times: user 10.6 ms, sys: 8.55 ms, total: 19.2 ms, Wall time: 12.1 s

Caching for at reducere beregningstiden på lignende drift (forbrugshukommelse)

husk afhængighedsgrafen, som vi byggede i det foregående trin? Vi kan køre den samme beregning som før Med cache-metoden for at fortælle afhængighedsgrafen at planlægge caching.

%%timeinterim = rdd1.map(lambda x: taketime(x)).cache()

den første beregning forbedres ikke, men den cacher det mellemliggende resultat,

%%timeprint('output =',interim.reduce(lambda x,y:x+y))>> output = -0.28870546796843666>> CPU times: user 16.4 ms, sys: 2.24 ms, total: 18.7 ms, Wall time: 15.3 s

Kør nu den samme filtermetode ved hjælp af cachelagret resultat,

%%timeprint(interim.filter(lambda x:x>0).count())>> 500000>> CPU times: user 14.2 ms, sys: 3.27 ms, total: 17.4 ms, Wall time: 811 ms

Åh! Computertiden kom ned til mindre end et sekund fra 12 sekunder tidligere! Denne måde, caching og parallelisering med doven udførelse, er kernen i programmering med Spark.

Dataframe og Sparkskl

bortset fra RDD er den anden nøgledatastruktur i Spark-rammen DataFrame. Hvis du har arbejdet med Python Pandas eller R DataFrame, kan konceptet virke velkendt.

en DataFrame er en distribueret samling af rækker under navngivne kolonner. Det svarer konceptuelt til en tabel i en relationsdatabase, et ark med kolonneoverskrifter eller en dataramme i R/Python, men med rigere optimeringer under hætten. DataFrames kan konstrueres fra en bred vifte af kilder, såsom strukturerede datafiler, tabeller i Hive, eksterne databaser eller eksisterende RDDs. Det deler også nogle fælles egenskaber med RDD:

  • uforanderlig i naturen: Vi kan oprette DataFrame / RDD en gang, men kan ikke ændre det. Og vi kan transformere en DataFrame / RDD efter anvendelse af transformationer.
  • dovne evalueringer: dette betyder, at en opgave ikke udføres, før en handling udføres. Distribueret: RDD og DataFrame er begge fordelt i naturen.

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-DataFrame-in-PySpark.PNG

fordele ved DataFrame

  • DataFrames er designet til behandling af en stor samling af strukturerede eller semistrukturerede data.
  • observationer i Spark DataFrame er organiseret under navngivne kolonner, som hjælper Apache Spark til at forstå skemaet for en DataFrame. Dette hjælper Spark med at optimere eksekveringsplanen på disse forespørgsler.
  • DataFrame i Apache Spark har evnen til at håndtere petabyte data.
  • DataFrame understøtter en bred vifte af dataformater og kilder.
  • det har API support til forskellige sprog som Python, R, Scala, Java.

dataframe basics eksempel

for grundlæggende og typiske brugseksempler på Dataframer, se venligst følgende Jupyter Notebooks,

Spark DataFrame basics

Spark DataFrame operationer

Sparks hjælper med at bygge bro over kløften for PySpark

relationelle datalagre er nemme at opbygge og forespørge. Brugere og udviklere foretrækker ofte at skrive let at fortolke, deklarative forespørgsler på et menneskelignende læsbart sprog som f.eks. Imidlertid, da data begynder at stige i volumen og variation, den relationelle tilgang skalerer ikke godt nok til at opbygge Big Data-applikationer og analytiske systemer.

Vi har haft succes inden for Big Data analytics med Hadoop og MapReduce-paradigmet. Dette var kraftigt, men ofte langsomt, og gav brugerne et lavt niveau, proceduremæssig programmeringsgrænseflade, der krævede, at folk skrev en masse kode til selv meget enkle datatransformationer. Men når Spark blev frigivet, revolutionerede det virkelig den måde, hvorpå Big Data analytics blev udført med fokus på in-memory computing, fejltolerance, abstraktioner på højt niveau og brugervenlighed.Spark forsøger i det væsentlige at bygge bro mellem de to modeller, vi nævnte tidligere—relationelle og proceduremæssige modeller. Spark arbejder gennem DataFrame API, der kan udføre relationelle operationer på både eksterne datakilder og Sparks indbyggede distribuerede samlinger-i skala!

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Spark-SQL-1024x489.png

hvorfor er det så hurtigt og optimeret? Årsagen er på grund af en ny udvidelig optimeringsenhed, Catalyst, baseret på funktionelle programmeringskonstruktioner i Scala. Catalyst understøtter både regelbaseret og omkostningsbaseret optimering. Mens udvidelige optimatorer tidligere er blevet foreslået, har de typisk krævet et komplekst domænespecifikt sprog for at specificere regler. Normalt fører dette til at have en betydelig indlæringskurve og vedligeholdelsesbyrde. I modsætning hertil bruger Catalyst standardfunktioner i Scala programmeringssprog, såsom mønstertilpasning, for at lade udviklere bruge det fulde programmeringssprog, mens de stadig gør regler lette at specificere.

Du kan henvise til følgende Jupyter-notesbog for at få en introduktion til databaseoperationer med Sparks:

Sparks database operations basics

hvordan skal du bruge PySpark i dit projekt?

Vi dækkede de grundlæggende elementer i Apache Spark-økosystemet, og hvordan det fungerer sammen med nogle grundlæggende brugseksempler på kernedatastruktur RDD med Python-grænsefladen PySpark. Også, DataFrame og Sparks blev diskuteret sammen med reference links for eksempel kode notebooks.

der er så meget mere at lære og eksperimentere med Apache Spark bliver brugt med Python. Pysparks hjemmeside er en god reference at have på din radar, og de foretager regelmæssige opdateringer og forbedringer–så hold øje med det.

og hvis du er interesseret i at gøre storstilet, distribueret maskinindlæring med Apache Spark, så tjek MLLib-delen af PySpark-økosystemet.

har du nydt denne PySpark Blog? Sørg for at tjekke ud:

  • Privacy-Preserving Deep Learning – PySyft Versus TF-Encrypted

Accelerate-your-AI.-1024x202.jpg

Skriv et svar

Din e-mailadresse vil ikke blive publiceret. Krævede felter er markeret med *