Mara-pipelines 是一個(gè)輕量級(jí)的數(shù)據(jù)轉(zhuǎn)換框架,具有透明和低復(fù)雜性的特點(diǎn)。其他特點(diǎn)如下:
基于非常簡(jiǎn)單的Python代碼就能完成流水線(xiàn)開(kāi)發(fā)。
使用 PostgreSQL 作為數(shù)據(jù)處理引擎。
有Web界面可視化分析流水線(xiàn)執(zhí)行過(guò)程。
基于 Python 的 multiprocessing 單機(jī)流水線(xiàn)執(zhí)行。不需要分布式任務(wù)隊(duì)列。輕松調(diào)試和輸出日志。
基于成本的優(yōu)先隊(duì)列:首先運(yùn)行具有較高成本(基于記錄的運(yùn)行時(shí)間)的節(jié)點(diǎn)。
此外,在Mara-pipelines的Web界面中,你不僅可以查看和管理流水線(xiàn)及其任務(wù)節(jié)點(diǎn),你還可以直接觸發(fā)這些流水線(xiàn)和節(jié)點(diǎn),非常好用:
1.安裝
由于使用了大量的依賴(lài),Mara-pipelines 并不適用于 Windows,如果你需要在 Windows 上使用 Mara-pipelines,請(qǐng)使用 Docker 或者 Windows 下的 linux 子系統(tǒng)。
使用pip安裝Mara-pipelines:
pip install mara-pipelines
或者:
pip install git+https://github.com/mara/mara-pipelines.git
2.使用示例
這是一個(gè)基礎(chǔ)的流水線(xiàn)演示,由三個(gè)相互依賴(lài)的節(jié)點(diǎn)組成,包括 任務(wù)1(ping_localhost), 子流水線(xiàn)(sub_pipeline), 任務(wù)2(sleep):
# 注意,這個(gè)示例中使用了部分國(guó)外的網(wǎng)站,如果無(wú)法訪(fǎng)問(wèn),請(qǐng)變更為國(guó)內(nèi)網(wǎng)站。 frommara_pipelines.commands.bash importRunBash frommara_pipelines.pipelines importPipeline, Task frommara_pipelines.ui.cli importrun_pipeline, run_interactively pipeline = Pipeline( id='demo', description='A small pipeline that demonstrates the interplay between pipelines, tasks and commands') pipeline.add(Task(id='ping_localhost', description='Pings localhost', commands=[RunBash('ping -c 3 localhost')])) sub_pipeline = Pipeline(id='sub_pipeline', description='Pings a number of hosts') forhost in['google', 'amazon', 'facebook']: sub_pipeline.add(Task(id=f'ping_{host}', description=f'Pings {host}', commands=[RunBash(f'ping -c 3 {host}.com')])) sub_pipeline.add_dependency('ping_amazon', 'ping_facebook') sub_pipeline.add(Task(id='ping_foo', description='Pings foo', commands=[RunBash('ping foo')]), ['ping_amazon']) pipeline.add(sub_pipeline, ['ping_localhost']) pipeline.add(Task(id='sleep', description='Sleeps for 2 seconds', commands=[RunBash('sleep 2')]), ['sub_pipeline'])
可以看到,Task包含了多個(gè)commands,這些 command s會(huì)用于真正地執(zhí)行動(dòng)作。
而 pipeline.add 的參數(shù)中,第一個(gè)參數(shù)是其節(jié)點(diǎn),第二個(gè)參數(shù)是此節(jié)點(diǎn)的上游。如:
pipeline.add(sub_pipeline, ['ping_localhost'])
則表明必須執(zhí)行完 ping_localhost 才會(huì)執(zhí)行 sub_pipeline.
為了運(yùn)行這個(gè)流水線(xiàn),需要配置一個(gè) PostgreSQL 數(shù)據(jù)庫(kù)來(lái)存儲(chǔ)運(yùn)行時(shí)信息、運(yùn)行輸出和增量處理狀態(tài):
importmara_db.auto_migration importmara_db.config importmara_db.dbs mara_db.config.databases = lambda: {'mara': mara_db.dbs.PostgreSQLDB(host='localhost', user='root', database='example_etl_mara')} mara_db.auto_migration.auto_discover_models_and_migrate()
如果 PostgresSQL 正在運(yùn)行并且賬號(hào)密碼正確,輸出如下所示(創(chuàng)建了一個(gè)包含多個(gè)表的數(shù)據(jù)庫(kù)):
Created database "postgresql+psycopg2://root@localhost/example_etl_mara" CREATETABLEdata_integration_file_dependency ( node_path TEXT[] NOTNULL, dependency_type VARCHARNOTNULL, hashVARCHAR, timestampTIMESTAMPWITHOUTTIMEZONE, PRIMARY KEY(node_path, dependency_type) ); .. more tables
為了運(yùn)行這個(gè)流水線(xiàn),你需要:
frommara_pipelines.ui.cli importrun_pipeline run_pipeline(pipeline)
這將運(yùn)行單個(gè)流水線(xiàn)節(jié)點(diǎn)及其 (sub_pipeline) 所依賴(lài)的所有節(jié)點(diǎn):
run_pipeline(sub_pipeline, nodes=[sub_pipeline.nodes['ping_amazon']], with_upstreams=True)
3.Web 界面
我認(rèn)為 mara-pipelines 最有用的是他們提供了基于Flask管控流水線(xiàn)的Web界面。
對(duì)于每條流水線(xiàn),他們都有一個(gè)頁(yè)面顯示:
所有子節(jié)點(diǎn)的圖以及它們之間的依賴(lài)關(guān)系
流水線(xiàn)的總體運(yùn)行時(shí)間圖表以及過(guò)去 30 天內(nèi)最昂貴的節(jié)點(diǎn)(可配置)
所有流水線(xiàn)節(jié)點(diǎn)及其平均運(yùn)行時(shí)間和由此產(chǎn)生的排隊(duì)優(yōu)先級(jí)的表
流水線(xiàn)最后一次運(yùn)行的輸出和時(shí)間線(xiàn)
對(duì)于每個(gè)任務(wù),都有一個(gè)頁(yè)面顯示
流水線(xiàn)中任務(wù)的上游和下游
最近 30 天內(nèi)任務(wù)的運(yùn)行時(shí)間
任務(wù)的所有命令
任務(wù)最后運(yùn)行的輸出
此外,流水線(xiàn)和任務(wù)可以直接從網(wǎng)頁(yè)端調(diào)用運(yùn)行,這是非常棒的特點(diǎn)。
責(zé)任編輯:haq
-
數(shù)據(jù)
+關(guān)注
關(guān)注
8文章
7256瀏覽量
91887 -
python
+關(guān)注
關(guān)注
56文章
4827瀏覽量
86761
原文標(biāo)題:超級(jí)方便的輕量級(jí) Python 流水線(xiàn)工具,還有漂亮的可視化界面!
文章出處:【微信號(hào):LinuxHub,微信公眾號(hào):Linux愛(ài)好者】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
自動(dòng)化開(kāi)裝封碼流水線(xiàn)數(shù)據(jù)采集解決方案

遠(yuǎn)程io模塊在汽車(chē)流水線(xiàn)的應(yīng)用
RISC-V五級(jí)流水線(xiàn)CPU設(shè)計(jì)

工業(yè)讀碼器解決方案在自動(dòng)化流水線(xiàn)上掃描條碼的應(yīng)用

SMT流水線(xiàn)布局優(yōu)化技巧
工業(yè)流水線(xiàn)的智能助手——智能計(jì)數(shù),效率倍增

流水線(xiàn)中Half-Buffer與Skid-Buffer的使用

行云流水線(xiàn) 滿(mǎn)足你對(duì)工作流編排的一切幻想~skr
ADS900高速流水線(xiàn)模數(shù)轉(zhuǎn)換器(ADC)數(shù)據(jù)表

ADS930高速流水線(xiàn)模數(shù)轉(zhuǎn)換器(ADC)數(shù)據(jù)表

ADS901高速流水線(xiàn)模數(shù)轉(zhuǎn)換器數(shù)據(jù)表

ADS5421流水線(xiàn)式模數(shù)轉(zhuǎn)換器(ADC)數(shù)據(jù)表

ADS5413 CMOS流水線(xiàn)模數(shù)轉(zhuǎn)換器(ADC)數(shù)據(jù)表

ADS5237流水線(xiàn)式模數(shù)轉(zhuǎn)換器(ADC)數(shù)據(jù)表

ADS828流水線(xiàn)式CMOS模數(shù)轉(zhuǎn)換器數(shù)據(jù)表

評(píng)論