Exxact / hluboké učení, HPC, AV, distribuce & více

co je Apache Spark?

Apache Spark je jedním z nejžhavějších nových trendů v oblasti technologií. Je to rámec s pravděpodobně nejvyšším potenciálem realizovat ovoce manželství mezi velkými daty a strojovým učením.

To běží rychle (až 100x rychleji než tradiční Hadoop MapReduce díky in-memory provoz, nabízí robustní, distribuované, odolný proti chybám dat objektů (tzv. RDD), a integruje krásně se světem strojového učení a graf analytics prostřednictvím doplňkové balíčky jako Mlib a GraphX.

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Apache-Spark-288x300.png

Spark je implementován na Hadoop / HDFS a psán většinou ve Scala, funkčním programovacím jazyce, podobném Javě. Ve skutečnosti Scala potřebuje nejnovější instalaci Java ve vašem systému a běží na JVM. Pro většinu začátečníků však Scala není jazyk, který se nejprve naučí, aby se pustili do světa datové vědy. Naštěstí, Zapalovací poskytuje nádherný Python integrace, tzv. PySpark, která umožňuje Python programátorům rozhraní s Jiskrou rámec a naučit se, jak manipulovat s daty v rozsahu a práce s objekty a algoritmy přes distribuovaný systém souborů.

v tomto článku se naučíme základy Pysparku. Existuje mnoho koncepcí (neustále vyvíjí a představil), a proto jsme se soustředili jen na základy s několika jednoduchými příklady. Čtenářům se doporučuje, aby na nich stavěli a zkoumali více sami.

Krátká historie Apache Spark

Apache Spark začala jako výzkumný projekt na UC Berkeley AMPLab v roce 2009 a byla otevřena počátkem roku 2010. Byl to třídní Projekt na UC Berkeley. Myšlenkou bylo vybudovat rámec pro správu klastrů, který může podporovat různé druhy klastrových výpočetních systémů. Mnoho myšlenek za systémem bylo v průběhu let prezentováno v různých výzkumných pracích. Po vydání se Spark rozrostl v širokou vývojářskou komunitu a v roce 2013 se přestěhoval do Apache Software Foundation. Dnes je projekt vyvíjen společně komunitou stovek vývojářů ze stovek organizací.

Spark není programovací jazyk

jedna věc k zapamatování je, že Spark není programovací jazyk jako Python nebo Java. Jedná se o univerzální distribuované zpracování dat motor, vhodný pro použití v širokém spektru okolností. Je zvláště užitečný pro zpracování velkých dat jak v měřítku, tak při vysoké rychlosti.

vývojáři Aplikací a dat vědci obecně začlenit Jiskru do svých aplikací, aby rychle, dotaz, analyzovat a transformovat data na stupnici. Některé úkoly, které jsou nejčastěji spojované s Jiskrou, zahrnout, – ETL a SQL batch pracovních míst v celé velké sady dat (často tb velikost), – zpracování streamování dat z IoT zařízení a uzly, data z různých senzorů, finanční a transakční systémy všeho druhu, a to – machine learning úkoly pro e-commerce nebo aplikací.

ve svém jádru Spark staví na horní části rámce Hadoop / HDFS pro zpracování distribuovaných souborů. Většinou je implementován s Scala, funkční jazykovou variantou Javy. K dispozici je jádro Spark zpracování dat motor, ale na vrcholu se, že existuje mnoho knihoven vyvinutých pro SQL typu analýzy dotazů, distribuované strojové učení, rozsáhlé grafické výpočty, a streaming zpracování dat. Spark podporuje více programovacích jazyků ve formě knihoven snadného rozhraní: Java, Python, Scala a R.

Spark Používá MapReduce Paradigmatu pro Distribuované Zpracování

základní myšlenkou distribuované zpracování je rozdělit na bloky dat do malé zvládnutelné kousky (včetně některých filtrování a třídění), výpočty v blízkosti data, tj. pomocí malých uzlin velkého clusteru pro konkrétní úlohy a pak re-spojit je zpět. Dělicí část se nazývá akce „Mapa“ a rekombinace se nazývá akce „snížit“. Společně vytvářejí slavné paradigma „MapReduce“, které Google představil kolem roku 2004 (viz původní dokument zde).

například, pokud má soubor 100 záznamů, které mají být zpracovány, 100 mappers může běžet společně zpracovat jeden záznam každý. Nebo možná 50 mapperů může běžet společně a zpracovat každý dva záznamy. Po dokončení zpracování všech mapovačů se framework zamíchá a třídí výsledky, než je předá reduktorům. Reduktor se nemůže spustit, zatímco mapovač stále probíhá. Všechny výstupní hodnoty mapy, které mají stejný klíč, jsou přiřazeny jednomu reduktoru, který pak agreguje hodnoty tohoto klíče.

PE_TITAN-RTX-Blog-1024x127.jpg

Jak Nastavit PySpark

Pokud jste již obeznámeni s Python a knihoven, jako jsou Pandy a Numpy, pak PySpark je skvělé rozšíření/rámec se učit s cílem vytvořit více škálovatelné, data-náročné analýzy a potrubí využitím energie Jiskry v pozadí.

přesný proces instalace a nastavení prostředí PySpark (na samostatném počítači) je poněkud zapojen a může se mírně lišit v závislosti na vašem systému a prostředí. Cílem je, aby se vaše pravidelné Jupyter data science prostředí pracuje s Spark v pozadí pomocí balíčku PySpark.

tento článek o médiu poskytuje další podrobnosti o procesu nastavení krok za krokem.

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Apache-Spark-Setup.png

Alternativně můžete použít nastavení Databricks pro procvičování Spark. Tato společnost byla vytvořena původními tvůrci Spark a má vynikající prostředí připravené ke spuštění pro distribuovanou analýzu se Spark.

ale myšlenka je vždy stejná. Distribuujete (a replikujete) svůj velký datový soubor v malých pevných částech na mnoha uzlech. Ty pak přinášejí compute engine blízko k nim tak, že celá operace je paralelizovat, chyba-tolerantní a škálovatelné.

díky práci s notebookem PySpark a Jupyter se můžete naučit všechny tyto koncepty, aniž byste cokoli utráceli na platformě AWS nebo Databricks. Můžete také snadno propojit s SparkSQL a MLlib pro manipulaci s databází a strojové učení. Bude mnohem snazší začít pracovat s velkými klastry v reálném životě, pokud jste tyto koncepty předem internalizovali!

Resilient Distributed Dataset (RDD) a SparkContext

mnoho programů Spark se točí kolem konceptu resilient distributed dataset (RDD), což je kolekce prvků odolných proti chybám, které lze provozovat paralelně. SparkContext je umístěn v programu ovladače a spravuje distribuovaná data přes pracovní uzly prostřednictvím Správce clusteru. Dobrá věc, o použití PySpark je, že všechna ta složitost dat, rozdělování a řízení úkolů je zpracována automaticky v zadní a programátor se může soustředit na konkrétní analytics nebo strojového učení, práci sám.

výhody-příklady-použití-Apache-Spark-s-PySpark-in-Python-Rdd-1.png

rdd-1

Existují dva způsoby, jak vytvořit RDDs–paralizující existující sbírku ve vaší řidič programu nebo odkazování na objekt dataset v externí úložný systém, jako je sdílený soubor – systém HDFS, HBase, nebo jakýkoli zdroj dat nabízí Hadoop InputFormat.

pro ilustraci s přístupem založeným na Pythonu zde uvedeme příklady prvního typu. Můžeme vytvořit jednoduché Python pole 20 náhodných celých čísel (mezi 0 a 10), pomocí Numpy random.randint(), a pak vytvořit RDD objekt jako následující,

from pyspark import SparkContextimport numpy as npsc=SparkContext(master="local")lst=np.random.randint(0,10,20)A=sc.parallelize(lst)

Poznámka: „4“ v argumentu. Označuje 4 výpočetní jádra (ve vašem lokálním počítači), která mají být použita pro tento objekt SparkContext. Když se podíváme na typ RDD objekt, dostaneme následující,

type(A)>> pyspark.rdd.RDD

Naproti paralelizace je kolekce (s collect ()), která přináší všechny distribuované prvky a vrátí je na hlavy uzlu.

A.collect()>> 

ale A již není jednoduché Numpy pole. Pomocí metody glom() můžeme zkontrolovat, jak jsou oddíly vytvořeny.

A.glom().collect()>> , , , ]

nyní zastavte SC a znovu jej inicializujte pomocí 2 jader a podívejte se, co se stane, když proces opakujete.

sc.stop()sc=SparkContext(master="local")A = sc.parallelize(lst)A.glom().collect()>> , ]

RDD je nyní distribuován na dva kusy, ne čtyři!

dozvěděli jste se o prvním kroku v distribuované datové analytice, tj. že řídí, jak vaše data jsou distribuována přes menší kusy pro další zpracování.

Některé Příklady Základních Operací s RDD & PySpark

>> 20

první prvek (první) a několik prvních prvků (si)

A.first()>> 4A.take(3)>> 

Odstranění duplicit pomocí distinct

POZNÁMKA: Tato operace vyžaduje shuffle s cílem zjistit duplicity napříč oddíly. Takže je to pomalá operace. Nepřeháněj to.

A_distinct=A.distinct()A_distinct.collect()>> 

Abych to shrnul, všechny prvky pomocí snížení metoda

Všimněte si použití lambda funkce v tomto,

A.reduce(lambda x,y:x+y)>> 80

Nebo přímé sum() metoda

A.sum()>> 80

Nalezení maximálního prvku o snížení

A.reduce(lambda x,y: x if x > y else y)>> 9

Najít nejdelší slovo v blob text

words = 'These are some of the best Macintosh computers ever'.split(' ')wordRDD = sc.parallelize(words)wordRDD.reduce(lambda w,v: w if len(w)>len(v) else v)>> 'computers'

Použití filtru pro logika-založené filtrování

# Return RDD with elements (greater than zero) divisible by 3A.filter(lambda x:x%3==0 and x!=0).collect()>> 

Napsat pravidelné Python funkce pro použití s snížení()

def largerThan(x,y): """ Returns the last word among the longest words in a list """ if len(x)> len(y): return x elif len(y) > len(x): return y else: if x < y: return x else: return ywordRDD.reduce(largerThan)>> 'Macintosh'

Poznámka: zde x < y provede lexikografické srovnání a určí, že Macintosh je větší než počítače!

Mapování operace s lambda funkce s PySpark

B=A.map(lambda x:x*x)B.collect()>> 

Mapování s pravidelným Python funkcí v PySpark

def square_if_odd(x): """ Squares if odd, otherwise keeps the argument unchanged """ if x%2==1: return x*x else: return xA.map(square_if_odd).collect()>> 

seskupení vrací RDD skupinových prvků (iterable) dle dané skupinové operace

V následujícím příkladu používáme seznam-s porozuměním spolu se skupinou vytvořit seznam dvou prvků, z nichž každý má záhlaví (výsledek lambda funkce, jednoduché modulo 2), a seřazený seznam prvků, které vedly k tomuto výsledku. Můžete si snadno představit, že tento druh separace může přijít obzvlášť vhod pro zpracování dat, která musí být nesloučených/konzervy na základě konkrétní operace provádí nad nimi.

result=A.groupBy(lambda x:x%2).collect()sorted()>> ), (1, )]

Použití histogramu

histogram() metoda bere seznam koše/kbelíky a vrátí n-tice s výsledkem histogram (binning),

B.histogram()>> (, )

operace

můžete Si také udělat běžné množinové operace na RDDs jako unie(), průnik(), odčítání(), nebo kartézské().

podívejte se na tento notebook Jupyter pro více příkladů.

Hluboké Učení Ebook ke Stažení

Líné vyhodnocování s PySpark (a Cache)

Líný hodnocení/počítání strategie, která připravuje detailní krok-za-krokem vnitřní mapu provedení potrubí pro výpočetní úkol, ale oddaluje konečné provedení, až když to je nezbytně nutné. Tato strategie je jádrem Spark pro urychlení mnoha paralelních velkých datových operací.

použijeme dvě CPU jádra pro tento příklad,

sc = SparkContext(master="local")

Vytvořit RDD s 1 milion prvků

%%timerdd1 = sc.parallelize(range(1000000))>> CPU times: user 316 µs, sys: 5.13 ms, total: 5.45 ms, Wall time: 24.6 ms

Některé výpočetní funkce – taketime

from math import cosdef taketime(x): return cos(x)

Zkontrolovat, kolik času je pořízena taketime funkce

%%timetaketime(2)>> CPU times: user 21 µs, sys: 7 µs, total: 28 µs, Wall time: 31.5 µs>> -0.4161468365471424

Nezapomeňte tento výsledek, taketime() funkce vzala na zeď čas 31,5 nás. Přesný počet bude samozřejmě záviset na stroji, na kterém pracujete.

nyní proveďte operaci mapy na funkci

%%timeinterim = rdd1.map(lambda x: taketime(x))>> CPU times: user 23 µs, sys: 8 µs, total: 31 µs, Wall time: 34.8 µs

Jak to, že každá funkce taketime trvá 45.8 nás, ale mapová operace s 1 milionem prvků RDD také trvala podobný čas?

z důvodu líného vyhodnocení tj. v předchozím kroku nebylo nic vypočítáno, byl vytvořen pouze plán provedení. Proměnná interim neukazuje na datovou strukturu, místo toho ukazuje na plán provádění, vyjádřený jako graf závislosti. Graf závislostí definuje, jak jsou RDD vypočteny od sebe navzájem.

skutečné provedení metodou redukce

%%timeprint('output =',interim.reduce(lambda x,y:x+y))>> output = -0.28870546796843666>> CPU times: user 11.6 ms, sys: 5.56 ms, total: 17.2 ms, Wall time: 15.6 s

takže doba stěny je zde 15,6 sekundy. Pamatujte, že funkce taketime() měla čas stěny 31.5 us? Proto očekáváme, že celkový čas bude řádově ~ 31 sekund pro 1 milion pole. Kvůli paralelnímu provozu na dvou jádrech to trvalo ~ 15 sekund.

nyní jsme v mezidobí neuložili (zhmotnili) žádné průběžné výsledky, takže další jednoduchá operace (např. počítání prvků > 0) zabere téměř stejný čas.

%%timeprint(interim.filter(lambda x:x>0).count())>> 500000>> CPU times: user 10.6 ms, sys: 8.55 ms, total: 19.2 ms, Wall time: 12.1 s

ukládání do Mezipaměti pro snížení výpočetního času na podobné operace (výdaje paměti)

vzpomeňte si na grafu závislosti, které jsme vytvořili v předchozím kroku? Můžeme spustit stejný výpočet jako dříve pomocí metody mezipaměti, abychom řekli grafu závislostí, aby plánoval ukládání do mezipaměti.

%%timeinterim = rdd1.map(lambda x: taketime(x)).cache()

první výpočet nebude zlepšovat, ale to ukládá prozatímní výsledek,

%%timeprint('output =',interim.reduce(lambda x,y:x+y))>> output = -0.28870546796843666>> CPU times: user 16.4 ms, sys: 2.24 ms, total: 18.7 ms, Wall time: 15.3 s

spustit stejný filtr metoda s pomocí mezipaměti výsledek,

%%timeprint(interim.filter(lambda x:x>0).count())>> 500000>> CPU times: user 14.2 ms, sys: 3.27 ms, total: 17.4 ms, Wall time: 811 ms

Wow! Výpočetní čas klesl na méně než sekundu z 12 sekund dříve! Tímto způsobem je ukládání do mezipaměti a paralelizace s líným provedením základní funkcí programování se Spark.

Dataframe a SparkSQL

kromě RDD je druhá klíčová datová struktura v rámci Spark DataFrame. Pokud jste udělali práci s Python pandy nebo R DataFrame, koncept se může zdát povědomý.

datový rámec je distribuovaná sbírka řádků pod pojmenovanými sloupci. Je koncepčně ekvivalentní tabulce v relační databázi, listu Excelu se záhlaví sloupců nebo datovému rámci V R / Pythonu, ale s bohatšími optimalizacemi pod kapotou. Datové rámce mohou být konstruovány z široké škály zdrojů, jako jsou strukturované datové soubory, tabulky v úlu, externí databáze nebo existující RDD. To také sdílí některé společné charakteristiky s RDD:

  • neměnný v přírodě: můžeme vytvořit DataFrame / RDD jednou, ale nemůže změnit. A můžeme transformovat DataFrame / RDD po aplikaci transformací.
  • Lazy hodnocení: to znamená, že úkol není proveden, dokud není provedena akce. Distribuované: RDD a DataFrame jsou distribuovány v přírodě.

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-DataFrame-in-PySpark.png

Výhody Datovém

  • datové části rámců jsou určeny pro zpracování velkého sběru strukturovaných nebo semi-strukturovaná data.
  • Pozorování v Zapalovací Datovém jsou organizovány pod jménem sloupce, který pomáhá Apache Spark pochopit schéma Datovém. To pomáhá Spark optimalizovat plán provádění těchto dotazů.
  • DataFrame v Apache Spark má schopnost zpracovávat petabajty dat.
  • DataFrame má podporu pro širokou škálu datových formátů a zdrojů.
  • má podporu API pro různé jazyky, jako je Python, R, Scala, Java.

Datovém základy příklad:

Pro základy a typické příklady použití datové části rámců, prosím, viz následující poznámkové bloky Jupyter,

Spark Datovém základy

Spark Datovém operací

SparkSQL Pomáhá Překlenout Propast pro PySpark

Relační datové sklady jsou snadno stavět a dotaz. Uživatelé a vývojáři často dávají přednost psaní snadno interpretovatelných deklarativních dotazů v lidském čitelném jazyce, jako je SQL. Nicméně, protože údaje se zvětšuje objem a rozmanitost, relační přístup není měřítko dobře dost pro budování Velkých Datových aplikací a analytických systémů.

měli jsme úspěch v oblasti analýzy velkých dat s Hadoop a paradigmatem MapReduce. Tento byl silný, ale často pomalé, a dal uživatelům na nízké úrovni, procedurální programovací rozhraní, které vyžaduje, aby lidé psát spoustu kódu pro jednoduché datové transformace. Nicméně, jakmile Spark byl propuštěn, to opravdu revoluci způsob, jakým Big Data analytics bylo provedeno se zaměřením na in-memory computing, odolnost proti chybám, abstrakce na vysoké úrovni, a snadnost použití.

Spark SQL se v podstatě snaží překlenout propast mezi dvěma modely, které jsme zmínili dříve-relační a procedurální modely. Spark SQL pracuje prostřednictvím rozhraní DataFrame API, které může provádět relační operace jak na externích zdrojích dat, tak na vestavěných distribuovaných sbírkách Spark-v měřítku!

The-Benefits-Examples-of-Using-Apache-Spark-with-PySpark-in-Python-Spark-SQL-1024x489.png

proč je Spark SQL tak rychlý a optimalizovaný? Důvodem je nový rozšiřitelný optimalizátor, Catalyst, založený na funkčních programovacích konstrukcích ve Scale. Catalyst podporuje optimalizaci založenou na pravidlech i nákladech. Zatímco v minulosti byly navrženy rozšiřitelné optimalizátory, obvykle vyžadovaly komplexní jazyk specifický pro doménu, aby určily pravidla. Obvykle to vede k významné křivce učení a údržbě. V kontrastu, Katalyzátor používá standardní funkce Scala je programovací jazyk, jako pattern-matching, aby vývojáři použít plnou programovací jazyk, zatímco ještě dělat pravidla snadné určit.

úvod do databázových operací se SparkSQL naleznete v následujícím notebooku Jupyter:

základy databázových operací SparkSQL

jak budete ve svém projektu používat PySpark?

Jsme se vztahuje základy Apache Spark ekosystém a jak to funguje spolu s některými základními příklady použití základní datové struktury RDD s Python rozhraní PySpark. Také, DataFrame a SparkSQL byly diskutovány spolu s referenčními odkazy například kód notebooky.

je toho mnohem víc, co se naučit a experimentovat s Apache Spark používaným s Pythonem. Web PySpark je dobrým odkazem na váš radar, a pravidelně aktualizují a vylepšují–takže na to dávejte pozor.

a pokud máte zájem o rozsáhlé distribuované strojové učení s Apache Spark, podívejte se na MLLib část ekosystému PySpark.

Líbilo se vám tento blog PySpark? Nezapomeňte se podívat:

  • Privacy-Preserving Deep Learning – PySyft Versus TF-Encrypted

Accelerate-your-AI.-1024x202.jpg

Napsat komentář

Vaše e-mailová adresa nebude zveřejněna. Vyžadované informace jsou označeny *