本文節(jié)選自CCF大數(shù)據(jù)教材系列叢書之《大數(shù)據(jù)處理》,本書由華中科技大學(xué)金海教授主編,包括大數(shù)據(jù)處理基礎(chǔ)技術(shù)、大數(shù)據(jù)處理編程與典型應(yīng)用處理、大數(shù)據(jù)處理系統(tǒng)與優(yōu)化三個(gè)方面。本教材以大數(shù)據(jù)處理編程為核心,從基礎(chǔ)、編程到優(yōu)化等多個(gè)方面對大數(shù)據(jù)處理技術(shù)進(jìn)行系統(tǒng)介紹,使得讀者能夠快速入門,同時(shí)體會大數(shù)據(jù)處理系統(tǒng)的設(shè)計(jì)理念與優(yōu)化方法本質(zhì)。
開源系統(tǒng)及編程模型
基于流計(jì)算的基本模型,當(dāng)前已有各式各樣的分布式流處理系統(tǒng)被開發(fā)出來。本節(jié)將對當(dāng)前開源分布式流處理系統(tǒng)中三個(gè)最典型的代表性的系統(tǒng):Apache Storm,Spark Streaming,Apache Flink以及它們的編程模型進(jìn)行詳細(xì)介紹。
Apache Storm
Apache Storm是由Twitter公司開源的一個(gè)實(shí)時(shí)分布式流處理系統(tǒng)[2],被廣泛應(yīng)用在實(shí)時(shí)分析、在線機(jī)器學(xué)習(xí)連續(xù)計(jì)算、分布式RPC、ETL等場景。Storm支持水平擴(kuò)展、具有高容錯性,保證數(shù)據(jù)能被處理,而且處理速度很快。Storm支持多種編程語言,易于部署和管理,是目前廣泛使用的流處理系統(tǒng)之一。
一、Storm中的數(shù)據(jù)封裝
Storm系統(tǒng)可以從分布式文件系統(tǒng)(如HDFS)或分布式消息隊(duì)列(如Kafka)中獲取源數(shù)據(jù),并將每個(gè)流數(shù)據(jù)元組封裝稱為tuple。一條數(shù)據(jù)流即是一個(gè)無邊界的tuple序列,而這些tuple序列可以以分布式的方式創(chuàng)建和處理。在Storm中,數(shù)據(jù)流中的每個(gè)tuple相互獨(dú)立,彼此間的處理上不存在任何關(guān)聯(lián)。Tuple也是Storm中消息傳遞的基本單元,其數(shù)據(jù)結(jié)構(gòu)如圖5-3-1所示。
如圖5-3-1所 示, 一 個(gè)tuple可以包含多個(gè)字段(field),每個(gè)字段代表對應(yīng)流數(shù)據(jù)的一個(gè)屬性,在Storm的每個(gè)操作組件發(fā)送向下游發(fā)送tuple時(shí),會聲明對應(yīng)tuple每個(gè)字段的順序和代表的含義(如數(shù)據(jù)的鍵、值、時(shí)間戳等)。
二、Storm中的應(yīng)用拓?fù)浣?/p>
在Storm中, 用 戶 所 提 交 的 應(yīng) 用 所 構(gòu) 建 的DAG拓?fù)浔环Q為Topology。Storm的Topology類似于MapReduce中的一個(gè)job,但區(qū)別在于這個(gè)拓?fù)鋾肋h(yuǎn)運(yùn)行(或者直到手動結(jié)束)。每個(gè)Topology中有兩個(gè)重要組件:spout和bolt。
spout是Topology中數(shù)據(jù)流的來源,也即對應(yīng)DAG模型中的起始操作。spout可以從外部源讀取數(shù)據(jù)并將其以封裝成tuple的形式發(fā)送到圖 5-3-1 tuple的數(shù)據(jù)結(jié)構(gòu)Topology中。bolt是Topology中對tuple進(jìn)行處理的主要單元。Storm并不區(qū)分中間和終止操作,而是將其統(tǒng)一為bolt來進(jìn)行實(shí)現(xiàn),也即對結(jié)果的輸出需要由用戶自己來實(shí)現(xiàn)。所有對流數(shù)據(jù)的處理都是在bolt中實(shí)現(xiàn),bolt可以執(zhí)行各種基礎(chǔ)操作,如過濾、聚合、連接等。bolt每處理完一個(gè)tuple后,可以按照應(yīng)用需求發(fā)送給0個(gè)或多個(gè)tuple給下游的bolt。
三、Storm中的并行度指定
Storm中的并行度有三層含義。首先是worker進(jìn)程數(shù)。Storm可以建立在分布式集群上,每臺物理節(jié)點(diǎn)可以發(fā)起一個(gè)或多個(gè)worker進(jìn)程。
一個(gè)worker對應(yīng)一個(gè)物理的JVM(Java虛擬機(jī))。通常,整個(gè)Topology會由一個(gè)或者多個(gè)worker進(jìn)程來負(fù)責(zé)執(zhí)行。每個(gè)worker會在一個(gè)JVM中運(yùn)行一個(gè)或多個(gè)executor,每個(gè)executor對應(yīng)一個(gè)線程,執(zhí)行某一個(gè)spout或者bolt的計(jì)算任務(wù)。在Storm中,每個(gè)spout/bolt都可以實(shí)例化生成多個(gè)task在集群中運(yùn)行,一般默認(rèn)情況下,executor數(shù)與task數(shù)一一對應(yīng),也即每個(gè)實(shí)例都由一個(gè)單獨(dú)的線程來執(zhí)行。用戶也可以指定task數(shù)大于executor數(shù),這時(shí)部分task會由同一個(gè)線程循環(huán)調(diào)用來執(zhí)行。在Storm的Topology建立時(shí),用戶可以根據(jù)需要依次來設(shè)定整體的worker進(jìn)程數(shù)以及每個(gè)spout/bolt對應(yīng)的executor數(shù)和task數(shù)。
四、Storm中的數(shù)據(jù)分組和傳輸
用戶可以通過定義分組策略(streaming grouping)來決定數(shù)據(jù)流如何在不同的spout/bolt的task中進(jìn)行分發(fā)和傳輸。分組策略將所有的spout和bolt連接起來構(gòu)成一個(gè)Topology,如圖5-3-2所示。除了5.2.4節(jié)所介紹的幾種基本分組策略外,Storm還支持其他的分組策略。例如local grouping,這是shufflegrouping的 一 種 變 種 分 組 策 略。由于Storm劃分多個(gè)worker進(jìn)程,shuffle grouping可能導(dǎo)致大量的進(jìn)程間通信,local grouping則是將元組優(yōu)先發(fā)往與自己同進(jìn)程的下游task中,若沒有這種下游task,才繼續(xù)沿用shuffle grouping的方式。
圖 5-3-2 streaming grouping
圖 5-3-3 Storm系統(tǒng)架構(gòu)
又如direct grouping,這是一種特殊的分組方式,用戶可以直接指定由下游的哪一個(gè)task來接收數(shù)據(jù)。
五、Storm的分布式系統(tǒng)架構(gòu)
Storm可以運(yùn)行在分布式集群上。Storm集群結(jié)構(gòu)沿用了主從架構(gòu)方式,即一個(gè)主控節(jié)點(diǎn)和多個(gè)工作節(jié)點(diǎn)。圖5-3-3展示了整個(gè)Storm的系統(tǒng)架構(gòu)。
Storm的基本組件分別如下。
Nimbus:運(yùn)行Nimbus的節(jié)點(diǎn)是Storm集群的主控節(jié)點(diǎn),Nimbus類似Hadoop中JobTracker的角色,是用戶和Storm系統(tǒng)之間的交互點(diǎn)。Nimbus主要的工作是用于用戶提交Topology、進(jìn)行集群任務(wù)的分配調(diào)度、進(jìn)行集群監(jiān)控和統(tǒng)計(jì)等。
Supervisor:每個(gè)工作節(jié)點(diǎn)上都運(yùn)行著一個(gè)Supervisor,Supervisor用于接收Nimbus分配的任務(wù),并根據(jù)分配產(chǎn)生worker進(jìn)程。同時(shí)它還會監(jiān)視worker的健康狀況,在必要的情況下會重啟worker進(jìn)程。
ZooKeeper:Storm系統(tǒng)借用Zookeeper集群來進(jìn)行Nimbus和Supervisor之間的所有協(xié)調(diào)工作,包括Nimbus對Supervisor所執(zhí)行的任務(wù)的調(diào)配,以及幫助Nimbus監(jiān)控所有Supervisor和task的運(yùn)行情況,以便失效時(shí)迅速重啟。
六、Storm的編程示例
下面,以一個(gè)簡單的WordCount應(yīng)用為例,來進(jìn)行Storm編程模型的講解。
(1)實(shí)現(xiàn)生成數(shù)據(jù)的spout,封裝數(shù)據(jù)首先構(gòu)建一個(gè)CreateSentenceSpout來進(jìn)行數(shù)據(jù)流的生成。為了簡化說明,從若干給定的靜態(tài)句子列表中每次隨機(jī)抽取一句作為一個(gè)tuple來傳遞給下游bolt進(jìn)行處理。CreateSentenceSpout的具體實(shí)現(xiàn)如代碼5-3-1所示。
以上代碼中,BaseRichSpout類是Storm提供的一個(gè)簡單接口,使用它可以默認(rèn)實(shí)現(xiàn)很多方法,使用戶只用關(guān)心實(shí)現(xiàn)應(yīng)用所需要的代碼上去。
在spout中最主要的工作就是數(shù)據(jù)的封裝。Spout的核心代碼在nextTuple( )方法中實(shí)現(xiàn),即如何產(chǎn)生所需的tuple并進(jìn)行傳輸。Spout會循環(huán)調(diào)用此方法來不斷產(chǎn)生新的tuple。在本例中,從open( )方法里給定的句子列表中隨機(jī)抽取一條作為tuple,并通過emit方法將tuple進(jìn)行傳輸。
在emit生成tuple時(shí),還需要對tuple中的每個(gè)字段進(jìn)行聲明。這是由declareOutputFields( )方法來實(shí)現(xiàn)。這個(gè)方法是Storm中所有spout/bolt都需要實(shí)現(xiàn)的一個(gè)方法。在本例中,生成的每個(gè)句子對應(yīng)一個(gè)tuple,其只具有一個(gè)字段,字段的值就是句子本身,因此在declareOutputFields( )中聲明字段只有一個(gè)“sentence”。open( )方法是對應(yīng)組件在進(jìn)行初始化時(shí)執(zhí)行的方法,其中要注意的是open( )方法會接收SpoutOutputCollector對象所提供的后續(xù)tuple傳輸方法作為參數(shù),因此在open( )方法的實(shí)現(xiàn)中,需要將其引用保存在一個(gè)變量當(dāng)中,以便nextTuple( )方法調(diào)用。
(2)實(shí)現(xiàn)對流數(shù)據(jù)進(jìn)行操作處理的bolt
在WordCount應(yīng)用中,對spout生成的句子,構(gòu)建兩個(gè)bolt來進(jìn)行處理:一個(gè)SplitWordBolt來將句子劃分為單詞,一個(gè)CountBolt來對劃分好的單詞進(jìn)行累計(jì)計(jì)數(shù)。下面,以SplitWordBolt為例來進(jìn)行講解,其實(shí)現(xiàn)代碼如代碼5-3-2所示。
BasicRichBolt類同樣是Storm對于bolt類提供的一個(gè)簡單接口,使用戶能夠僅集中編寫所需的操作邏輯即可。
Bolt的核心是execute( )方法。每當(dāng)接收到一個(gè)新的tuple,都會直接對此方法進(jìn)行調(diào)用,然后執(zhí)行。使用getStringByField( )方法可以讀取在上游組件生成tuple時(shí)聲明的對應(yīng)字段里的值。當(dāng)完成處理后,如果新產(chǎn)生的tuple需要繼續(xù)向后傳輸,可以通過調(diào)用emit方法對tuple進(jìn)行發(fā)送。
prepare( )方法與spout中 的open( )方 法 功 能 相 似。Declare-OutputFields( )方法與spout( )中相同,這里不再贅述。
(3)構(gòu)建流應(yīng)用Topology,并指明并行度和分組策略
實(shí)現(xiàn)了對應(yīng)的spout和bolt功能之后,最后就是將其連接成一個(gè)完整的Topology。本例中Topology的代碼如代碼5-3-3所示。
以上代碼中,首先生成了TopologyBuilder的一個(gè)實(shí)例,然后分別對應(yīng)生成spout和bolt的各個(gè)實(shí)例。在setSpout和setBolt方法中,第一個(gè)參數(shù)為對應(yīng)的組件注冊了ID,第二個(gè)參數(shù)生成對應(yīng)組件的實(shí)例,而第三個(gè)參數(shù)為對應(yīng)組件需要生成的executor個(gè)數(shù)。
在setBolt方法中,除了對應(yīng)生成實(shí)例外,還需要指定每個(gè)bolt需要接收哪個(gè)組件發(fā)送給自己的數(shù)據(jù),以及數(shù)據(jù)的發(fā)送方式,即分組策略。例如本例中,CountBolt需要從SplitWordBolt處接收數(shù)據(jù),SplitWordBolt發(fā)送的數(shù)據(jù)以fields grouping( 同key grouping) 的方式進(jìn)行發(fā)送,其中用于分組的鍵值為SplitWordBolt發(fā)送tuple的“word”字段的值。
最后,可以自由指定程序的并行度??梢允褂胹etNumWorkers方法來指定用于執(zhí)行此Topology中worker進(jìn)程的個(gè)數(shù),本例中為整個(gè)Topology分配了4個(gè)worker進(jìn)程;可以用setSpout和setBolt方法中的第三個(gè)參數(shù)指定executor數(shù)量,而若需要指定更多的task數(shù),則可以繼續(xù)使用setNumTasks進(jìn)行設(shè)定。本例為每個(gè)spout/bolt都生成了4個(gè)executor,而進(jìn)一步為SplitWordBolt分配了8個(gè)task,這使得每2個(gè)task由一個(gè)executor線程來負(fù)責(zé)執(zhí)行。
Spark Streaming
Spark Streaming是Spark API核心擴(kuò)展,提供對實(shí)時(shí)數(shù)據(jù)流進(jìn)行流式處理,具備可擴(kuò)展、高吞吐和容錯等特性。Spark Streaming支持從多種數(shù)據(jù)源中提取數(shù)據(jù),例如Twitter、Kafka、Flume、ZeroMQ和TCP套接字,并提供了一些高級的API來表示復(fù)雜處理算法,如map、reduce、join、windows等,最后可以將得到的結(jié)果存儲到分布式文件系統(tǒng)(如HDFS)、數(shù)據(jù)庫或者其他輸出,Spark的機(jī)器學(xué)習(xí)和圖計(jì)算的算法也可以應(yīng)用于Spark Streaming的數(shù)據(jù)流中。
一、Spark Streaming中的數(shù)據(jù)封裝
和Storm不同的是,Spark Streaming本質(zhì)上是一個(gè)典型的微批處理系統(tǒng),其與以元組為單位進(jìn)行流式處理不同,它將無盡的數(shù)據(jù)流按時(shí)間切分為連續(xù)的小批次數(shù)據(jù),然后以傳統(tǒng)的批處理方法來進(jìn)行快速連續(xù)的處理。
在Spark Streaming中,數(shù)據(jù)流被抽象成以時(shí)間片段分隔開的離散流(discretized stream)形式。簡單而言,就是將所有的流數(shù)據(jù)按照一定的批大?。ㄈ?秒)分割成一段又一段的小批次數(shù)據(jù),如圖5-3-4所示。Spark Streaming使用Spark引擎,將每一段小批次數(shù)據(jù)轉(zhuǎn)化成為Spark當(dāng)中的RDD(彈性分布式數(shù)據(jù)集)。流數(shù)據(jù)即以RDD的形式在Spark Streaming系統(tǒng)中進(jìn)行運(yùn)算。
圖 5-3-4 Spark Streaming的離散流
二、Spark Streaming中的應(yīng)用拓?fù)浣?/p>
Spark Streaming同樣在系統(tǒng)中構(gòu)建出DAG的處理模型。不過與Storm不同,Spark Streaming并不使用固定的處理單元來執(zhí)行單一的操作。實(shí)際上,Spark Streaming中的DAG與Spark Core中的DAG相同,只是用DAG的形式將每一個(gè)時(shí)間分片對應(yīng)的RDD進(jìn)行運(yùn)算的job來進(jìn)一步劃分成任務(wù)集stage,以便進(jìn)行高效的批處理。Spark Streaming沿用了Spark Core對RDD提供的transformation操作,將所有RDD依次進(jìn)行轉(zhuǎn)換,應(yīng)用邏輯分別進(jìn)行轉(zhuǎn)換處理,進(jìn)而實(shí)現(xiàn)對整個(gè)離散流的轉(zhuǎn)換。
圖5-3-5展示了Spark Streaming的整體計(jì)算框架,一方面在線輸入的數(shù)據(jù)流被按照時(shí)間切分為若干小批次數(shù)據(jù)并被轉(zhuǎn)化成為RDD存儲在內(nèi)存中,另一方面,根據(jù)流應(yīng)用邏輯,也即流處理引用抽象出DAG拓?fù)?,制定出相?yīng)的RDD transformation。RDD不斷被批量執(zhí)行transformation操作,直到產(chǎn)生最終的結(jié)果。
圖 5-3-5 Spark Streaming 計(jì)算框架[7]
三、Spark Streaming中的并行度指定
由于Spark Streaming本質(zhì)上是將數(shù)據(jù)流的任務(wù)劃分成為大量的微批數(shù)據(jù),對應(yīng)多個(gè)job來執(zhí)行,所以Spark Streaming的并行度設(shè)定與Spark進(jìn)行批處理時(shí)的設(shè)定一樣,只能設(shè)定整體job的并行度,而不能對每個(gè)操作單獨(dú)的并行度進(jìn)行設(shè)置。然而由于批處理的特性,SparkStreaming可以最大化對系統(tǒng)并行能力的利用,也能獲得相對更高的系統(tǒng)吞吐率。
四、Spark Streaming中的數(shù)據(jù)分組和傳輸
由于使用微批處理技術(shù),Spark Streaming的數(shù)據(jù)被打包為一個(gè)個(gè)微批,而每個(gè)微批相互獨(dú)立地進(jìn)行處理,所以不涉及所提到的數(shù)據(jù)分組與傳輸問題。但這也展現(xiàn)出微批處理的一個(gè)局限性,其難以靈活處理基于用戶自定義的窗口的聚合、計(jì)數(shù)等操作,也不能進(jìn)行針對數(shù)據(jù)流的連續(xù)計(jì)算,如兩個(gè)數(shù)據(jù)流的實(shí)時(shí)連接等操作。
五、Spark Streaming的系統(tǒng)框架
Spark Streaming建立在Spark系統(tǒng)之上,其系統(tǒng)架構(gòu)相對于Spark的修改和新增部分如圖5-3-6所示。
圖 5-3-6 Spark Streaming 基于Spark修改和新增的組件[7]
除開Spark系統(tǒng)本身組件外,Spark Streaming主要組件如下。
master:是Spark Streaming中流應(yīng)用的入口。根據(jù)應(yīng)用邏輯產(chǎn)生用于轉(zhuǎn)換RDD的task然后進(jìn)行調(diào)度,并對這些task進(jìn)行追蹤。D-Streamlineage包含了離散流間的轉(zhuǎn)換關(guān)系,類似流應(yīng)用的DAG圖。
client:Spark Streaming建立了一個(gè)client庫來將數(shù)據(jù)傳入到系統(tǒng)當(dāng)中。
worker:是Spark Streaming中流數(shù)據(jù)的入口以及執(zhí)行RDD轉(zhuǎn)換的主要組件。相對于Spark,主要新增了input receiver對流數(shù)據(jù)進(jìn)行獨(dú)立的接收。流數(shù)據(jù)可以是從系統(tǒng)外在線地進(jìn)行讀取進(jìn)來,并轉(zhuǎn)化為離散流的形式,也可以是經(jīng)過其他execution執(zhí)行轉(zhuǎn)化后的離散流。
六、Spark Streaming的編程示例
Spark Streaming的編程較為簡單,這是由于它本身基于Spark建立,有豐富的API可以調(diào)用,可以省去大量無關(guān)的編碼。下面同樣以WordCount應(yīng)用為例來對Spark Streaming的編程模型進(jìn)行說明。
(1)離散流的輸入和數(shù)據(jù)封裝
在WordCount應(yīng)用中,假定直接從一個(gè)socket來獲取源源不斷的句子數(shù)據(jù)流,那么數(shù)據(jù)流的輸入具體實(shí)現(xiàn)如代碼5-3-4所示。
以上代碼中,首先建立了JavaStreamingContext對象,同時(shí)需要指定劃分離散流的時(shí)間間隔。本例中指定了每隔1s就劃分一次微批。接著,指定從端口8888的socket中持續(xù)獲取數(shù)據(jù)流。通過以上代碼,每個(gè)executor獲取的數(shù)據(jù)流就會根據(jù)1s的時(shí)間間隔不斷劃分成小批次,并進(jìn)一步轉(zhuǎn)化為RDD。這一串RDD的組合即是新產(chǎn)生的“l(fā)ines”離散流。
(2)建立應(yīng)用拓?fù)?,進(jìn)行離散流的轉(zhuǎn)化
離散流的轉(zhuǎn)化即根據(jù)相應(yīng)的應(yīng)用邏輯指定對應(yīng)的RDD的轉(zhuǎn)化方式。在WordCount應(yīng)用中,先將句子轉(zhuǎn)化為若干的單詞,然后將每個(gè)單詞變成(單詞,計(jì)數(shù))的二元對,最后對相同單詞的二元對計(jì)數(shù)進(jìn)行累加。具體實(shí)現(xiàn)如代碼5-3-5所示。
以上代碼中,利用Spark豐富的transformation方法,將由一個(gè)個(gè)句子組成的“l(fā)ines”離散流首先通過flatMap的方式映射為由單詞組成的“words”離散流。進(jìn)一步通過mapToPair的方式映射為(單詞,計(jì)數(shù))二元對組成的“pairs”離散流,這里每個(gè)單詞沒有累加前,計(jì)數(shù)值就直接等于1。最后通過reduceByKey的方式,對相同單詞的計(jì)數(shù)進(jìn)行累加操作。
Apache Flink
Apache Flink是一個(gè)同時(shí)支持分布式數(shù)據(jù)流處理和數(shù)據(jù)批處理的大數(shù)據(jù)處理系統(tǒng)。其特點(diǎn)是完全以流處理的角度出發(fā)進(jìn)行設(shè)計(jì),而將批處理看作是有邊界的流處理特殊流處理來執(zhí)行。Flink可以表達(dá)和執(zhí)行許多類別的數(shù)據(jù)處理應(yīng)用程序,包括實(shí)時(shí)數(shù)據(jù)分析、連續(xù)數(shù)據(jù)管道、歷史數(shù)據(jù)處理(批處理)和迭代算法(機(jī)器學(xué)習(xí)、圖表分析等)。
Flink同樣是使用單純流處理方法的典型系統(tǒng),其計(jì)算框架與原理和Apache Storm比較相似。Flink做了許多上層的優(yōu)化,也提供了豐富的API供開發(fā)者能更輕松地完成編程工作。
一、Flink中的數(shù)據(jù)封裝
Flink能夠支撐對多種類型的數(shù)據(jù)進(jìn)行處理,例如Flink支撐任意的Java或者Scala類型,這使得Flink使用更加靈活。類似Storm,F(xiàn)link同樣也可以使用多字段的tuple為其基本數(shù)據(jù)單元。Flink可以支持了多種Flink tuple類型(tuple1至tuple25),每種tuple都是一個(gè)固定長度的對象序列。
二、Flink中的應(yīng)用拓?fù)浣?/p>
Flink中核心概念為數(shù)據(jù)流(stream)和轉(zhuǎn)換(transformation)。每個(gè)轉(zhuǎn)換對應(yīng)的是一個(gè)簡單的操作,根據(jù)應(yīng)用邏輯,轉(zhuǎn)換按先后順序構(gòu)成了流應(yīng)用的DAG圖,如圖5-3-7所示。數(shù)據(jù)流在轉(zhuǎn)換之間傳遞,直到完成所有的轉(zhuǎn)換進(jìn)行輸出。Flink應(yīng)用包含明確的源操作和匯聚操作,用于數(shù)據(jù)的輸入與輸出。
Flink內(nèi) 部 實(shí) 現(xiàn) 了 許 多 基 本 的 轉(zhuǎn) 換 操 作, 比 如Map、FlatMap、Reduce、Window等, 同 時(shí) 也 實(shí) 現(xiàn) 了 許 多 源 和 匯 聚 操 作, 比 如writeAsText、writeAsCsv、print等。Flink提供了豐富的API以簡化用戶對應(yīng)用拓?fù)涞木帉懞捅磉_(dá)。
三、Flink中的并行度指定
與Storm相似,F(xiàn)link程序的計(jì)算框架本質(zhì)上也并行分布式的。在系統(tǒng)中,一個(gè)流包含一個(gè)或多個(gè)流分區(qū),而每一個(gè)轉(zhuǎn)換操作包含一個(gè)或多個(gè)子任務(wù)實(shí)例。操作的子任務(wù)間彼此獨(dú)立,以不同的線程執(zhí)行,可以運(yùn)行在不同的機(jī)器或容器上。
一個(gè)Flink應(yīng)用同樣運(yùn)行在一個(gè)或多個(gè)worker進(jìn)程當(dāng)中。一個(gè)worker中生成一個(gè)或多個(gè)task slot。每個(gè)task slot用以承載和執(zhí)行Flink每個(gè)轉(zhuǎn)換操作的一個(gè)子任務(wù)實(shí)例。Flink可以指定全局的task slot數(shù)目作為其最大的并行度。同時(shí)若部分轉(zhuǎn)換不需要使用如此多資源,F(xiàn)link也可以指定每一操作具體的子任務(wù)數(shù)。每個(gè)轉(zhuǎn)換操作對應(yīng)的子任務(wù)默認(rèn)輪詢地分布在分配的task slot內(nèi)。
四、Flink中的數(shù)據(jù)分組與傳輸
Flink的數(shù)據(jù)分組方法主要包括一對一(one-to-one)模式或者重分組(redistributing)模式兩種。
采用一對一模式時(shí),數(shù)據(jù)流中元素的分組和順序會保持不變,也就是說,對于上下游的兩個(gè)不同的轉(zhuǎn)換操作,下游任一子任務(wù)內(nèi)要處理的元組數(shù)據(jù),與上游相同順序的子任務(wù)所處理的元組數(shù)據(jù)完全一致。
采用重分組模式則會改變數(shù)據(jù)流所在的分組。重分組后元組的目標(biāo)子任務(wù)根據(jù)處理的變換方法不同而發(fā)生改變。例如經(jīng)過keyBy( )轉(zhuǎn)化,元組就會根據(jù)keyBy( )的參數(shù)選擇對應(yīng)的字段作為key值,進(jìn)行哈希計(jì)算來重新分組。經(jīng)過broadcast( )轉(zhuǎn)化即相應(yīng)地進(jìn)行廣播等。
五、Flink的系統(tǒng)框架
圖5-3-8顯示了Apache Flink的分布式運(yùn)行環(huán)境架構(gòu)。
Flink的系統(tǒng)架構(gòu)中包含以下重要組件。
jobclinet:jobclient是一個(gè)獨(dú)立的程序執(zhí)行入口。job client負(fù)責(zé)接收用戶提交的程序,并將用戶提交的程序通過優(yōu)化器和graph builder轉(zhuǎn)換成dataflow graph(類似流應(yīng)用的DAG圖)。然后將生成的data flow提交給job manager進(jìn)行job的管理和調(diào)度。一旦執(zhí)行完成,job client返回給用戶最后的執(zhí)行結(jié)果。
jobmanager:對應(yīng)一個(gè)Flink程序的master進(jìn)程,負(fù)責(zé)job的管理和資源的協(xié)調(diào)。主要包括任務(wù)調(diào)度、監(jiān)控任務(wù)的執(zhí)行狀態(tài)、協(xié)調(diào)任務(wù)的執(zhí)行、檢查點(diǎn)管理和失敗恢復(fù)等。
圖 5-3-8 Apache Flink分 布 式運(yùn)行環(huán)境[9]
taskmanager和taskslot:是Flink中具體負(fù)責(zé)執(zhí)行tasks的組件。每個(gè)taskmanage對應(yīng)是運(yùn)行在節(jié)點(diǎn)上的JVM進(jìn)程,擁有一定的量的資源。比如內(nèi)存、CPU、網(wǎng)絡(luò)、磁盤等。每個(gè)執(zhí)行的task運(yùn)行在其中的一個(gè)或多個(gè)線程中。taskslot是分布式程序真正執(zhí)行task的地方。每個(gè)taskslot可以包括JVM進(jìn)程中的一部分內(nèi)存。
六、Flink的編程示例
Flink的編程核心也就在 數(shù) 據(jù) 流 和 轉(zhuǎn) 換 上。 下 面, 依 然 以WordCount為例來對Flink的編程模型進(jìn)行說明。代碼5-3-6是Flink中以5分鐘為窗口進(jìn)行一次求和統(tǒng)計(jì)的WordCount應(yīng)用代碼。
在以上代碼中,定義了一個(gè)DataStream實(shí)例,并通過socket的方式從8888端口監(jiān)聽在線獲取數(shù)據(jù)。監(jiān)聽到的句子數(shù)據(jù)被使用flatmap轉(zhuǎn)化成單詞,并直接以(單詞,計(jì)數(shù))二元對的形式記錄下來。當(dāng)流被轉(zhuǎn)化為二元對后,接著根據(jù)當(dāng)前第0位的字段“word”進(jìn)行keyBy( )的操作,最后以5分鐘為窗口大小,對計(jì)數(shù)值進(jìn)行累計(jì)。
Flink的編程非常簡潔和直觀,上例中,DataStream從源操作從socket在線讀取數(shù)據(jù),到各種轉(zhuǎn)換操作,到最后的匯聚求和操作都可以直接表達(dá)出來。Flink提供了豐富的API和各種表達(dá)上的簡化來降低用戶的編程難度和編程量。
上例通過使用env.setParallelism來設(shè)置流處理程序的整體并行度,即taskslot數(shù)量為8。同時(shí),可以進(jìn)一步為每一個(gè)操作設(shè)置并行度,如在saveAsText( )操作后通過使用setParallelism將這個(gè)操作的并行度修改為1。
-
機(jī)器學(xué)習(xí)
+關(guān)注
關(guān)注
66文章
8502瀏覽量
134592 -
大數(shù)據(jù)
+關(guān)注
關(guān)注
64文章
8960瀏覽量
140177 -
編程模型
+關(guān)注
關(guān)注
0文章
8瀏覽量
1452
原文標(biāo)題:從Storm到Flink:大數(shù)據(jù)處理的開源系統(tǒng)及編程模型(文末福利)
文章出處:【微信號:rgznai100,微信公眾號:rgznai100】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
云計(jì)算、大數(shù)據(jù)處理技術(shù)交流
常用大數(shù)據(jù)處理技術(shù)歸類
如何從零學(xué)大數(shù)據(jù)?
阿里巴巴高級技術(shù)專家章劍鋒:大數(shù)據(jù)發(fā)展的 8 個(gè)要點(diǎn)
大數(shù)據(jù)處理系統(tǒng)模式及其應(yīng)用分析

開源大數(shù)據(jù)生態(tài)下的 Flink 應(yīng)用實(shí)踐
大數(shù)據(jù)海量數(shù)據(jù)處理方法總結(jié)
Apache Storm的安裝部署

評論