用于Apache katkatm的流式SQL引擎KSQL詳解
KSQL是一個用于Apache katkatm的流式SQL引擎。KSQL降低了進入流處理的門檻,提供了一個簡單的、完全交互式的SQL接口,用于處理Kafka的數(shù)據(jù)。你不再需要用Java或Python這樣的編程語言編寫代碼了!KSQL是開源的(Apache 2.0許可)、分布式的、可擴展的、可靠的和實時的。它支持廣泛的強大的流處理操作,包括聚合、連接、窗口、會話,等等。
一個簡單的例子
查詢流數(shù)據(jù)是什么意思,這與SQL數(shù)據(jù)庫有什么區(qū)別呢?
實際上,它與SQL數(shù)據(jù)庫有很大的不同。大多數(shù)數(shù)據(jù)庫都用于對存儲數(shù)據(jù)進行按需查找和修改。KSQL不進行查找(但是),它所做的是連續(xù)的轉(zhuǎn)換——也就是,流處理。例如,假設我有一個來自用戶的點擊流,以及一個關于這些用戶不斷更新的帳戶信息的表。KSQL允許我對這一串單擊和用戶表進行建模,并將兩者結(jié)合在一起。即使這兩件事之一是無限的。
因此,KSQL所運行的是連續(xù)查詢——在Kafka主題的數(shù)據(jù)流中,連續(xù)不斷地運行新數(shù)據(jù)。相反,傳統(tǒng)數(shù)據(jù)庫對關系數(shù)據(jù)庫的查詢是一次性查詢——在數(shù)據(jù)庫中運行一次SELECT語句獲取有限行的數(shù)據(jù)集。
KSQL的好處是什么?
很好,所以你可以不斷地查詢無限的數(shù)據(jù)流。這有什么好處?
1 實時監(jiān)控實時分析 CREATETABLEerror_counts ASSELECTerror_code,count(*)FROMmonitoring_stream WINDOW TUMBLING (SIZE1MINUTE) WHEREtype =‘ERROR’
其中的一個用途是定義定制的業(yè)務級度量,這些度量是實時計算的,您可以監(jiān)視和警報,就像您的CPU負載一樣。另一個用途是在KSQL中定義應用程序的正確性的概念,并檢查它在生產(chǎn)過程中是否會遇到這個問題。通常,當我們想到監(jiān)控時,我們會想到計數(shù)器和儀表跟蹤低水平的性能統(tǒng)計。這些類型的測量器通??梢愿嬖V你CPU負載很高,但是它們不能真正告訴你你的應用程序是否在做它應該做的事情。KSQL允許從應用程序生成的原始事件流中定義定制指標,無論它們是日志事件、數(shù)據(jù)庫更新還是其他類型的事件。
例如,一個web應用程序可能需要檢查,每次新客戶注冊一個受歡迎的電子郵件,創(chuàng)建一個新的用戶記錄,并且他們的信用卡被計費。這些功能可能分布在不同的服務或應用程序中,您可能希望監(jiān)視每個新客戶在SLA中發(fā)生的每一件事,比如30秒。
2 安全性和異常檢測 CREATESTREAM possible_fraud ASSELECTcard_number, count (*) FROM authorization_attempts WINDOW TUMBLING (SIZE 5 SECONDS) GROUP BY card_number HAVING count(*)》 3;
這是您在上面的演示中看到的一個簡單的版本:KSQL查詢,它將事件流轉(zhuǎn)換為數(shù)值時間序列,使用Kafka-Elastic連接器將其注入到彈性中,并在Grafana UI中可視化。安全用例通??雌饋砗芟癖O(jiān)視和分析。而不是監(jiān)視應用程序的行為或業(yè)務行為,您正在尋找欺詐、濫用、垃圾郵件、入侵或其他不良行為的模式。KSQL提供了一種簡單、復雜和實時的方式來定義這些模式和查詢實時流。
3 在線數(shù)據(jù)集成 CREATESTREAM vip_users ASSELECTuserid, page,actionFROMclickstream c LEFTJOINusers u ONc.userid = u.user_id WHEREu.level=‘Platinum’;
在公司中完成的大部分數(shù)據(jù)處理都屬于數(shù)據(jù)豐富的領域:從幾個數(shù)據(jù)庫中提取數(shù)據(jù),轉(zhuǎn)換它,將其連接到一個鍵值存儲、搜索索引、緩存或其他數(shù)據(jù)服務系統(tǒng)中。在很長一段時間內(nèi),用于數(shù)據(jù)集成的ETL-提取、轉(zhuǎn)換和加載-作為周期性的批處理作業(yè)執(zhí)行。例如,實時轉(zhuǎn)儲原始數(shù)據(jù),然后每隔幾個小時轉(zhuǎn)換一次,以實現(xiàn)高效的查詢。對于許多用例來說,這種延遲是不可接受的。KSQL與Kafka的連接器一起使用時,可以從批處理數(shù)據(jù)集成到在線數(shù)據(jù)集成。您可以使用流-表連接存儲在表中的元數(shù)據(jù)來豐富數(shù)據(jù)流,或者在將流加載到另一個系統(tǒng)之前對PII(個人可識別的信息)進行簡單的過濾。
4 應用程序開發(fā)
許多應用程序?qū)⑤斎肓鬓D(zhuǎn)換為輸出流。 例如,負責重新排序在線商店庫存不足的產(chǎn)品的流程可能會產(chǎn)生銷售和出貨流,以計算出訂單流。
對于用Java編寫的更復雜的應用程序來說,Kafka的原生流API可能幫助不大。但是對于簡單的應用程序,或者對Java編程不感興趣的團隊來說,一個簡單的SQL接口可能就是他們想要的。
KSQL中的核心抽象
KSQL在內(nèi)部使用Kafka的Streams API,并且它們共享與Kafka流處理相同的核心抽象。 KSQL有兩個核心抽象,它們映射到Kafka Streams中的兩個核心抽象,并允許您操縱Kafka主題:
1.流:流是無限制的結(jié)構化數(shù)據(jù)序列(“事實”)。 例如,我們可以有一個金融交易流,例如“Alice向Bob發(fā)送了100美元,然后查理向鮑勃發(fā)送了50美元”。 流中的事實是不可變的,這意味著可以將新事實插入到流中,但是現(xiàn)有事實永遠不會被更新或刪除。 流可以從Kafka主題創(chuàng)建,或者從現(xiàn)有的流和表中派生。
CREATESTREAM pageviews (viewtime BIGINT, userid VARCHAR, pageid VARCHAR)WITH(kafka_topic=‘pageviews’, value_format=’JSON’);
2。表:一個表是一個流或另一個表的視圖,它代表了一個不斷變化的事實的集合。例如,我們可以擁有一個包含最新財務信息的表,例如“Bob的經(jīng)常帳戶余額為$150”。它相當于傳統(tǒng)的數(shù)據(jù)庫表,但通過流化等流語義來豐富。表中的事實是可變的,這意味著可以將新的事實插入到表中,現(xiàn)有的事實可以被更新或刪除??梢詮腒afka主題中創(chuàng)建表,也可以從現(xiàn)有的流和表中派生表。
CREATETABLEusers (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR) WITH(kafka_topic=‘users’, value_format=‘DELIMITED’);
KSQL簡化了流應用程序,因為它完全集成了表和流的概念,允許使用表示現(xiàn)在發(fā)生的事件的流來連接表示當前狀態(tài)的表。 Apache Kafka中的一個主題可以表示為KSQL中的STREAM或TABLE,具體取決于主題處理的預期語義。 例如,如果要將主題中的數(shù)據(jù)作為一系列獨立值讀取,則可以使用CREATE STREAM。此類流的一個例子是捕獲頁面視圖事件,其中每個頁面視圖事件都不相關且獨立于另一個頁面視圖事件。另一方面,如果您希望將某個主題中的數(shù)據(jù)讀取為可更新的值的集合,那么您將使用CREATE TABLE。在KSQL中應該讀取一個主題的示例,它捕獲用戶元數(shù)據(jù),其中每個事件代表特定用戶id的最新元數(shù)據(jù),如用戶的姓名、地址或首選項。
KSQL:實時點擊流分析和異常檢測
讓我們來看一個真正的例子。這個例子展示如何使用KSQL進行實時監(jiān)視、異常檢測和警報。對clickstream數(shù)據(jù)的實時日志分析可以采取多種形式。在本例中,我們將標記在web服務器上消耗過多帶寬的惡意用戶會話。監(jiān)視惡意用戶會話是會話化的眾多應用之一。但從廣義上說,會話是用戶行為分析的基礎。一旦您將用戶和事件關聯(lián)到一個特定的會話標識符,您就可以構建許多類型的分析,從簡單的度量,例如訪問計數(shù)。我們通過展示如何在Elastic支持的Grafana儀表板上實時顯示KSQL查詢的輸出,來結(jié)束這個例子。
您也可以按照我們的指示,親自完成例子,并查看代碼。
非常好我支持^.^
(0) 0%
不好我反對
(0) 0%