Exxact / Deep Learning, HPC, AV, Distribution & More

mi az Apache Spark?

az Apache Spark a technológiai tartomány egyik legforróbb új trendje. Ez az a keretrendszer, amely valószínűleg a legnagyobb potenciállal rendelkezik a Big Data és a gépi tanulás közötti házasság gyümölcsének megvalósítására.

fut gyorsan (akár 100x gyorsabb, mint a hagyományos Hadoop MapReduce miatt memória művelet, kínál, robusztus, elosztott, hibatűrő data objects (az úgynevezett RDD), valamint integrálja gyönyörű a világ, a gépi tanulás, illetve grafikon analytics keresztül kiegészítő csomagok, mint Mlib, valamint GraphX.

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Apache-Spark-288x300.png

A Spark a Hadoop / HDFS-en valósul meg, többnyire Scala-ban, a Java-hoz hasonló funkcionális programozási nyelven. Valójában a Scala-nak szüksége van a legújabb Java telepítésre a rendszeren, majd a JVM-en fut. A legtöbb kezdő számára azonban a Scala nem olyan nyelv, amelyet először tanulnak meg az Adattudomány világába. Szerencsére, Spark nyújt egy csodálatos Python integráció, az úgynevezett PySpark, amely lehetővé teszi a Python programozók, hogy interfész a Spark keretrendszer, és megtanulják, hogyan kell manipulálni az adatokat a skála és a munka tárgyak és algoritmusok egy elosztott fájlrendszer.

ebben a cikkben megtanuljuk a PySpark alapjait. Sok koncepció létezik (folyamatosan fejlődik és bevezetésre kerül), ezért csak az alapokra koncentrálunk néhány egyszerű példával. Az olvasókat arra ösztönzik, hogy ezekre építsenek, és többet fedezzenek fel maguktól.

az Apache Spark

rövid története az Apache Spark kutatási projektként indult az UC Berkeley AMPLab-ban 2009-ben, és 2010 elején nyílt forráskódú volt. Ez egy osztály projekt UC Berkeley. Az ötlet egy klaszter menedzsment keretrendszer létrehozása volt, amely különféle klaszter számítástechnikai rendszereket támogat. A rendszer mögött álló ötletek közül sok az évek során különböző kutatási tanulmányokban került bemutatásra. A kiadás után a Spark széles fejlesztői közösséggé nőtte ki magát, majd 2013-ban az Apache Software Foundation-be költözött. Ma a projektet több száz szervezet több száz fejlesztőjének közössége dolgozza ki.

A Spark nem programozási nyelv

egy dolog megjegyezni, hogy a Spark nem olyan programozási nyelv, mint a Python vagy a Java. Ez egy általános célú elosztott adatfeldolgozó motor, amely sokféle körülmények között használható. Különösen hasznos a nagy adatfeldolgozáshoz mind méretarányban, mind nagy sebességgel.

az alkalmazásfejlesztők és az adattudósok általában beépítik a Spark-ot az alkalmazásaikba, hogy gyorsan lekérdezzék, elemezzék és méretarányosan átalakítsák az adatokat. A Sparkhoz leggyakrabban kapcsolódó feladatok közé tartozik a – ETL és az SQL kötegelt feladatok nagy adatkészletekben (gyakran terabájt méretben), – az IoT eszközökről és csomópontokról származó adatfolyamok feldolgozása, különböző érzékelők adatai, mindenféle pénzügyi és tranzakciós rendszer, valamint-gépi tanulási feladatok az e-kereskedelem vagy informatikai alkalmazások számára.

a Spark a Hadoop / HDFS keretrendszer tetejére épül az elosztott fájlok kezelésére. Leginkább a Scala-val, a Java funkcionális nyelvi változatával valósul meg. Van egy alapvető Spark adatfeldolgozó motor, de ezen felül számos könyvtárat fejlesztettek ki az SQL-típusú lekérdezéselemzéshez, az elosztott gépi tanuláshoz, a nagyméretű gráfszámításhoz és az adatfeldolgozáshoz. Több programozási nyelv támogatja Spark formájában egyszerű interfész könyvtárak: Java, Python, Scala, R.

Szikra Használja a MapReduce Paradigma az Elosztott Feldolgozás

Az alapötlet elosztott feldolgozás osztani az adatokat darabokat a kis kezelhető darab (beleértve a szűrés válogatás), hozza a számítás közel az adatokat, azaz a kis csomópontok egy nagy fürt számára speciális munkahelyek, majd újra össze őket vissza. Az osztási részt “Térkép” műveletnek, a rekombinációt “Reduce” műveletnek nevezzük. Együtt elkészítik a híres “MapReduce” paradigmát, amelyet a Google 2004 körül vezetett be (lásd az eredeti papírt itt).

például, ha egy fájlnak 100 feldolgozandó rekordja van, 100 Mapper futhat együtt egy rekord feldolgozásához. Vagy talán 50 a térképészek együtt futhatnak, hogy két rekordot dolgozzanak fel. Miután az összes leképezés befejeződött, a keret megkeveri az eredményeket, mielőtt továbbadná őket a reduktoroknak. A reduktor nem indul el, amíg a térképező még folyamatban van. Az összes olyan térkép kimeneti érték, amely ugyanazzal a kulccsal rendelkezik, egyetlen szűkítőhöz van hozzárendelve, amely ezután összesíti az adott kulcs értékeit.

PE_TITAN-RTX-Blog-1024x127.jpg

hogyan kell beállítani PySpark

Ha már ismeri a Python és könyvtárak, mint a pandák és Numpy, majd PySpark egy nagy kiterjesztés / keret, hogy megtanulják annak érdekében, hogy hozzon létre több skálázható, adatintenzív elemzések és csővezetékek kihasználva a hatalom Spark a háttérben.

a pontos folyamat telepítése és beállítása PySpark környezet (egy önálló gép) némileg részt, és kissé eltérhet attól függően, hogy a rendszer és a környezet. A cél az, hogy a rendszeres Jupyter data science környezetben dolgozó Spark a háttérben a PySpark csomag.

Ez a közepes cikk további részleteket tartalmaz a lépésről-lépésre történő beállítási folyamatról.

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Apache-Spark-Setup.PNG

Alternatív megoldásként a Databricks beállítását is használhatja a Spark gyakorlásához. Ezt a céget a Spark eredeti alkotói hozták létre, kiváló indítás előtti környezetük van a Spark elosztott elemzéséhez.

de az ötlet mindig ugyanaz. Nagy adatkészletét apró, rögzített részekben osztja el (és replikálja) több csomóponton keresztül. Ezután közel hozza a számítási motort hozzájuk, hogy az egész művelet párhuzamos, hibatűrő és skálázható legyen.

a PySpark és a Jupyter notebook használatával megtanulhatja ezeket a fogalmakat anélkül, hogy bármit költene az AWS vagy a Databricks platformon. Azt is könnyen interfész SparkSQL és MLlib adatbázis manipuláció és gépi tanulás. Sokkal könnyebb lesz a valós nagy klaszterekkel dolgozni, ha előzetesen internalizálta ezeket a fogalmakat!

rugalmas elosztott adatkészlet (RDD) és SparkContext

sok Spark program egy rugalmas elosztott adatkészlet (RDD) koncepciója körül forog, amely egy hibatűrő elemgyűjtemény, amely párhuzamosan működtethető. A SparkContext a Driver programban található, majd a klaszterkezelőn keresztül kezeli az elosztott adatokat a dolgozói csomópontokon keresztül. A PySpark használatával az a jó, hogy az adatparticionálás és a feladatkezelés ilyen bonyolultsága hátul automatikusan történik, a programozó pedig az adott analitikai vagy gépi tanulási feladatra összpontosíthat.

the-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Rdd-1.png

rdd-1

kétféle módon lehet RDDs–t létrehozni-egy meglévő gyűjtemény párhuzamosítása az illesztőprogram programjában, vagy egy külső tárolórendszer adatkészletének hivatkozása, például megosztott fájlrendszer, HDFS, HBase vagy bármilyen adatforrás, amely Hadoop InputFormat – ot kínál.

a Python alapú megközelítéssel történő illusztrációhoz itt adunk példákat az első típusra. Létrehozhatunk egy egyszerű Python tömböt 20 véletlenszerű egész számból (0 és 10 között), numpy random segítségével.randint (), majd hozzon létre egy RDD objektumot a következő,

from pyspark import SparkContextimport numpy as npsc=SparkContext(master="local")lst=np.random.randint(0,10,20)A=sc.parallelize(lst)

Megjegyzés A ” 4 ” az érv. 4 számítási magot jelöl (a helyi gépen), amelyeket ehhez a SparkContext objektumhoz kell használni. Ha ellenőrizzük az RDD objektum típusát, akkor a következőt kapjuk:

type(A)>> pyspark.rdd.RDD

a párhuzamosítással szemben a gyűjtemény (gyűjtéssel ())), amely az összes elosztott elemet hozza vissza a fej csomópontba.

A.collect()>> 

de a már nem egy egyszerű Numpy tömb. A Glom () módszerrel ellenőrizhetjük a partíciók létrehozásának módját.

A.glom().collect()>> , , , ]

most állítsa le az SC-t, majd indítsa újra 2 maggal, majd nézze meg, mi történik a folyamat megismétlésekor.

sc.stop()sc=SparkContext(master="local")A = sc.parallelize(lst)A.glom().collect()>> , ]

az RDDD most két részre oszlik, nem négyre!

megismerte az elosztott Adatelemzés első lépését, azaz kontrolling, hogy az adatok particionált át kisebb darabokat további feldolgozás

Néhány Példa az Alapvető Műveletek RDD & PySpark

>> 20

Az első elem (első), valamint az első néhány elemek (venni)

A.first()>> 4A.take(3)>> 

Eltávolítása másolatok használata különböző

MEGJEGYZÉS: Ez a művelet megkövetel egy shuffle felderítése érdekében, hogy a párhuzamos át a partíciókat. Tehát ez egy lassú művelet. Ne vigyük túlzásba.

A_distinct=A.distinct()A_distinct.collect()>> 

, Hogy az összeg, mind a elemek használata csökkenti a módszer

Megjegyzés: a használni egy lambda-függvény ebben,

A.reduce(lambda x,y:x+y)>> 80

Vagy a közvetlen szum() a módszer

A.sum()>> 80

Találni maximális elem csökkentésével

A.reduce(lambda x,y: x if x > y else y)>> 9

Megtalálni a leghosszabb szó, egy kis szöveg

words = 'These are some of the best Macintosh computers ever'.split(' ')wordRDD = sc.parallelize(words)wordRDD.reduce(lambda w,v: w if len(w)>len(v) else v)>> 'computers'

a szűrő segítségével a logikai alapú szűrés

# Return RDD with elements (greater than zero) divisible by 3A.filter(lambda x:x%3==0 and x!=0).collect()>> 

Írd rendszeres Python funkciók használata csökkentheti()

def largerThan(x,y): """ Returns the last word among the longest words in a list """ if len(x)> len(y): return x elif len(y) > len(x): return y else: if x < y: return x else: return ywordRDD.reduce(largerThan)>> 'Macintosh'

Megjegyezzük, hogy itt az x < y lexikográfiai összehasonlítást végez, és meghatározza, hogy a Macintosh nagyobb, mint a számítógépek!

Leképezés művelet egy lambda-függvény PySpark

B=A.map(lambda x:x*x)B.collect()>> 

Feltérképezése rendszeres Python funkció PySpark

def square_if_odd(x): """ Squares if odd, otherwise keeps the argument unchanged """ if x%2==1: return x*x else: return xA.map(square_if_odd).collect()>> 

groupby visszatér egy RDD csoportosított elemek (iterable), mint egy adott csoport művelet

a következő példa, használjuk a list-megértés együtt a csoport, hogy hozzon létre egy listát a két elem, mindegyik egy fejléc (az eredménye, hogy a lambda funkció, egyszerű modulo 2), valamint egy rendezett lista az elemek, amelyek az adott okot, hogy az eredmény. Könnyen elképzelhető, hogy ez a fajta szétválasztás különösen hasznos lehet az adatok feldolgozásához, amelyeket az általuk végzett műveletek alapján kell binned/konzerválni.

result=A.groupBy(lambda x:x%2).collect()sorted()>> ), (1, )]

hisztogram használatával

a hisztogram() módszer a tartályok/vödrök listáját veszi fel, és a hisztogram (binning),

B.histogram()>> (, )

műveletek beállítása

az rddds like – union (), metszéspont (), kivonás () vagy derékszögű().

nézze meg ezt Jupyter notebook további példákat.

Mély Tanulás Ebook Letöltés

Lusta értékelést PySpark (pedig Caching)

Lusta értékelés értékelő/számítás stratégia, amely előkészíti részletes, lépésről-lépésre belső térkép a kivégzés csővezeték egy számítási feladat, de késlelteti az utolsó kivégzést, míg, ha feltétlenül szükséges. Ez a stratégia középpontjában Spark felgyorsítása sok párhuzamosított nagy adat műveletek.

használjunk két CPU magok ebben a példában,

sc = SparkContext(master="local")

egy RDD 1 millió elemek

%%timerdd1 = sc.parallelize(range(1000000))>> CPU times: user 316 µs, sys: 5.13 ms, total: 5.45 ms, Wall time: 24.6 ms

Valamilyen számítási funkció – taketime

from math import cosdef taketime(x): return cos(x)

nézd meg, mennyi az idő által hozott taketime funkció

%%timetaketime(2)>> CPU times: user 21 µs, sys: 7 µs, total: 28 µs, Wall time: 31.5 µs>> -0.4161468365471424

Emlékszem, hogy ezt az eredményt, a taketime() függvény vett fal idő 31.5 minket. Természetesen a pontos szám attól függ, hogy milyen gépen dolgozik.

most végezze el a térképműveletet a

%%timeinterim = rdd1.map(lambda x: taketime(x))>> CPU times: user 23 µs, sys: 8 µs, total: 31 µs, Wall time: 34.8 µs

hogyan jön minden taketime funkció 45.8 minket, de a térkép működését egy 1 millió elem RDD is vett egy hasonló idő?

a lusta értékelés miatt, azaz az előző lépésben semmit sem számítottak ki, csak egy végrehajtási tervet készítettek. Az interim változó nem mutat adatszerkezetre, hanem egy végrehajtási tervre mutat, függőségi grafikonként kifejezve. A Függőségi grafikon meghatározza, hogyan számítják ki az RDD-ket egymástól.

a tényleges végrehajtás redukáló módszerrel

%%timeprint('output =',interim.reduce(lambda x,y:x+y))>> output = -0.28870546796843666>> CPU times: user 11.6 ms, sys: 5.56 ms, total: 17.2 ms, Wall time: 15.6 s

tehát a falidő itt 15,6 másodperc. Ne feledje, hogy a taketime () funkció falideje 31,5 us volt? Ezért arra számítunk, hogy a teljes idő ~ 31 másodperces sorrendben lesz egy 1 millió tömb esetében. A két mag párhuzamos működése miatt ~ 15 másodpercet vett igénybe.

most már nem mentettünk (materializáltuk) közbenső eredményeket az időközi, így egy másik egyszerű művelet (például a számlálási elemek > 0) majdnem ugyanabban az időben történik.

%%timeprint(interim.filter(lambda x:x>0).count())>> 500000>> CPU times: user 10.6 ms, sys: 8.55 ms, total: 19.2 ms, Wall time: 12.1 s

gyorsítótárazás a számítási idő csökkentése érdekében hasonló műveleteken (kiadási memória)

emlékezzen az előző lépésben épített függőségi grafikonra? Futtathatjuk ugyanazt a számítást, mint korábban a gyorsítótár módszerrel, hogy megmondjuk a függőségi grafikont a gyorsítótár tervezéséhez.

%%timeinterim = rdd1.map(lambda x: taketime(x)).cache()

az első számítás nem javul, de gyorsítótárazza az időközi eredményt,

%%timeprint('output =',interim.reduce(lambda x,y:x+y))>> output = -0.28870546796843666>> CPU times: user 16.4 ms, sys: 2.24 ms, total: 18.7 ms, Wall time: 15.3 s

most futtassa ugyanazt a szűrési módszert gyorsítótárazott eredmény segítségével,

%%timeprint(interim.filter(lambda x:x>0).count())>> 500000>> CPU times: user 14.2 ms, sys: 3.27 ms, total: 17.4 ms, Wall time: 811 ms

Wow! A számítási idő kevesebb, mint egy másodpercre csökkent 12 másodperccel korábban! Így a gyorsítótárazás és a lusta végrehajtással való párhuzamosítás a Spark programozás alapvető jellemzője.

Dataframe és SparkSQL

az RDD-n kívül a Spark keretrendszer második legfontosabb adatstruktúrája a DataFrame. Ha már végzett munkát Python pandák vagy R DataFrame, a koncepció ismerősnek tűnhet.

a DataFrame a sorok elosztott gyűjteménye a megnevezett oszlopok alatt. Fogalmilag egyenértékű egy relációs adatbázis táblájával, egy Oszlopfejlécekkel ellátott Excel-lappal vagy egy R/Python adatkerettel, de gazdagabb optimalizálással a motorháztető alatt. DataFrames lehet kialakítani a legkülönbözőbb forrásokból, mint a strukturált adatfájlok, táblázatok Hive, külső adatbázisok, vagy a meglévő RDD-k. Azt is osztja néhány közös jellemzői RDD:

  • megváltoztathatatlan jellegű: tudjuk létrehozni DataFrame / RDD egyszer, de nem tudja megváltoztatni. Átalakíthatunk egy Adatframe / RDD-t transzformációk alkalmazása után.
  • lusta értékelések: ez azt jelenti, hogy a feladatot nem hajtják végre, amíg egy műveletet nem hajtanak végre. Elosztott: az RDD és az DataFrame egyaránt eloszlik a természetben.

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-DataFrame-in-PySpark.png

A DataFrame

  • DataFrames előnyei strukturált vagy félig strukturált adatok nagy gyűjteményének feldolgozására szolgálnak.
  • A Spark DataFrame megfigyelései a megnevezett oszlopok alatt vannak rendezve, ami segít az Apache Sparknak megérteni egy Adatframe sémáját. Ez segít Spark optimalizálja a végrehajtási terv ezeket a lekérdezéseket.
  • DataFrame az Apache Spark képes kezelni petabájt adat.
  • DataFrame támogatja a széles körű adatformátumok és források.
  • ez API támogatása különböző nyelveken, mint a Python, R, Scala, Java.

DataFrame alapjai példa

a fundamentumok és a tipikus használati példák DataFrames, kérjük, olvassa el a következő Jupyter Notebook,

Spark DataFrame alapjai

Spark DataFrame műveletek

SparkSQL segít áthidalni a szakadékot PySpark

relációs adattárolók könnyen építhető és lekérdezhető. A felhasználók és a fejlesztők gyakran inkább a könnyen értelmezhető, deklaratív lekérdezések írását részesítik előnyben olyan emberszerű, olvasható nyelven, mint az SQL. Mivel azonban az adatok mennyisége és változatossága növekszik, a relációs megközelítés nem elég jól méretezhető a nagy Adatalkalmazások és analitikai rendszerek kiépítéséhez.

sikerrel jártunk a Big Data analytics területén a Hadoop és a MapReduce paradigmával. Ez erős volt, de gyakran lassú, és adott a felhasználóknak egy alacsony szintű, procedurális programozási felület, amely megkövetelte az embereket, hogy írjon egy csomó kódot még nagyon egyszerű adattranszformációk. Azonban, miután a Spark megjelent, ez tényleg forradalmasította a Big Data analytics volt, amelynek középpontjában a memória Számítástechnika, hibatűrés, magas szintű absztrakciók, valamint a könnyű használat.

A Spark SQL lényegében megpróbálja áthidalni a korábban említett két modell—a relációs és procedurális modellek-közötti szakadékot. A Spark SQL a DataFrame API-n keresztül működik, amely relációs műveleteket végezhet mind a külső adatforrásokon, mind a Spark beépített elosztott gyűjteményein-méretarányosan!

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Spark-SQL-1024x489.png

miért olyan gyors és optimalizált a Spark SQL? Ennek oka egy új bővíthető optimalizáló, katalizátor, amely a Scala funkcionális programozási konstrukcióin alapul. A Catalyst támogatja mind a szabályalapú, mind a költségalapú optimalizálást. Míg a bővíthető optimalizálók már javasolt a múltban, ők általában szükséges egy komplex domain-specifikus nyelvet, hogy meghatározza a szabályokat. Ez általában jelentős tanulási görbéhez és karbantartási terhekhez vezet. Ezzel szemben a Catalyst a Scala programozási nyelv szabványos funkcióit használja, például a mintaillesztést, hogy a fejlesztők a teljes programozási nyelvet használhassák, miközben továbbra is megkönnyítik a szabályok meghatározását.

akkor olvassa el a következő Jupyter notebook egy bevezetés adatbázis műveletek SparkSQL:

SparkSQL adatbázis műveletek alapjai

hogyan fogja használni PySpark a projekt?

lefedtük az Apache Spark ökoszisztéma alapjait, valamint azt, hogy hogyan működik, valamint néhány alapvető felhasználási példát a Python interface PySpark rddd alapvető adatstruktúrájára. Emellett a DataFrame-et és a SparkSQL-t is megvitatták referenciakapcsolatokkal, például kód noteszgépekkel együtt.

sokkal többet lehet megtanulni és kísérletezni az Apache Spark Python használatával. A PySpark honlap egy jó utalás, hogy a radar, és teszik a rendszeres frissítések és fejlesztések – így tartsa szemmel, hogy.

és ha érdekli az Apache Spark segítségével végzett nagyszabású, elosztott gépi tanulás, akkor nézze meg a PySpark ökoszisztéma Mlib részét.

élvezte ezt a PySpark blogot? Ügyeljen arra, hogy nézd meg:

  • Privacy-Preserving Deep Learning – PySyft Versus TF-Encrypted

Accelerate-your-AI.-1024x202.jpg

Vélemény, hozzászólás?

Az e-mail-címet nem tesszük közzé. A kötelező mezőket * karakterrel jelöltük