From 6bcffa9b9096f1e570747df983c582611e20d3b0 Mon Sep 17 00:00:00 2001 From: qianmoQ Date: Mon, 6 Mar 2023 22:43:39 +0800 Subject: [PATCH] [Plugin] Support kafka for native --- README.md | 3 + assets/plugin/kafka.png | Bin 0 -> 6683 bytes core/datacap-server/pom.xml | 5 + .../main/etc/conf/plugins/native/kafka.json | 19 +++ .../public/static/images/plugin/Kafka.png | Bin 0 -> 2742 bytes plugin/datacap-native-kafka/pom.xml | 71 +++++++++ .../plugin/natived/kafka/KafkaAdapter.java | 147 ++++++++++++++++++ .../plugin/natived/kafka/KafkaConnection.java | 60 +++++++ .../plugin/natived/kafka/KafkaParser.java | 27 ++++ .../plugin/natived/kafka/KafkaPlugin.java | 80 ++++++++++ .../natived/kafka/KafkaPluginModule.java | 38 +++++ .../io.edurt.datacap.spi.PluginModule | 1 + .../natived/kafka/KafkaPluginModuleTest.java | 30 ++++ .../plugin/natived/kafka/KafkaPluginTest.java | 43 +++++ pom.xml | 1 + 15 files changed, 525 insertions(+) create mode 100644 assets/plugin/kafka.png create mode 100644 core/datacap-server/src/main/etc/conf/plugins/native/kafka.json create mode 100644 core/datacap-web/console-fe/public/static/images/plugin/Kafka.png create mode 100644 plugin/datacap-native-kafka/pom.xml create mode 100644 plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaAdapter.java create mode 100644 plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaConnection.java create mode 100644 plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaParser.java create mode 100644 plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaPlugin.java create mode 100644 plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginModule.java create mode 100644 plugin/datacap-native-kafka/src/main/resources/META-INF/services/io.edurt.datacap.spi.PluginModule create mode 100644 plugin/datacap-native-kafka/src/test/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginModuleTest.java create mode 100644 plugin/datacap-native-kafka/src/test/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginTest.java diff --git a/README.md b/README.md index 857691e2..3c9bc2e5 100644 --- a/README.md +++ b/README.md @@ -143,6 +143,9 @@ Here are some of the major database solutions that are supported:   Aliyun OSS +   + + Apache Kafka

diff --git a/assets/plugin/kafka.png b/assets/plugin/kafka.png new file mode 100644 index 0000000000000000000000000000000000000000..7668eed988f4af939c20307fb92cef39d204a520 GIT binary patch literal 6683 zcmXw8dpy(M|L-D^WYNV`%Vvfo%(WO{jW?EDQgbWJklQC++>%eBHqB+5#-^s+l55Ci z7=0qlHMxtaOz!u~M3>(_-|zSL&-?Mdobx=d_w#igugmk0E?qR2+$Xnh$BrG6fCbua z$BvzV@SY(iD!gViv}oZP7KEebBTvBeoahf0k ziTzOc-n|5VRph`EGfxYPzhp(Y_rkun4PCRXtl4^&XCJw}5t+`HoNFIhsB`TLdY*Ug z@~>}z7_{+}`pilE@??6H(XW*@bsOVfYdt-Q#=kdDE|*=4`n5it9`*aRF(L-^7}f`m z+Mcg`RT(s96uH)IZlms2JJ{QnqMpvH4O?AnkTNz*%LADnk3U3%d;FX$>pEktks;tk zEKcVa*M)EW5{%6@NI)C=WRbxB`PDAtdW~A!$kh(LnK%jP0h3DL{-0lhqu=STrHp?r zvA55}fCjSK;L}G2a&)b%*Mx?ai>2|8Ub}bG9bfjLF#%-i5}gFNHOOPLB*mePyejO< z=b(CyMiD1l)AHYEYw25glCyFiNb@qDDG_y(!Eb12h;?_h%GxU&@8-{h(-7shVe2z} z`IC&L#k%c3orHV1lc4I&>AxxIRd4Lkg4Uv z>kYy&icP#j|9$>-=f@tG68L0JXM*M6f` zFt#^ORAZ&5&LPKL(%$8^X!5hKw2YXsdSC0kSDwUdIx^mzlDMT2 zV6sxD$7MZ=;feV1Q;Hsp7o2#Ko-*yReupj>zo3t~t&;z`yrwSX)z>Mv zJK1W*y*C2}I7UHMoWlu^HJLA1<(Uy}J1hej_VEl!C%;C2Q}T(-s*t<&DZCGXQor)A z6wh4aWV%$Hf7^k|ST2XkY#(qrCV5)LRp;k{)kYsb8vNV%RfOD>qM6`h&yE2q07y1-&;g0GrW9EkDp>(0e^4@*KSd?3~xWoTHdRhg~$(hA{g ztg7I(pO)UOq0#&1kgroyjCb)zIuJaM&M&j3E`&t}Mf6oe7<^6rasabmRWe zL4AKjGqhmVi+*P{xP|8`@Kh_OH0ZU5+Z6WO%ZRXhXZ#|ioe#@)=yENbFc`Eq8Wx}3 zOIKFgtvsOv91BvtVlZSiJX7?B0T(qtM0{|m$jd2S$8jR0+^1*df+osnj0vchSWv^8 zG?k5ICclQ$t*L&he~w(R`@8*18JuQlB;+v&ZJwOW7&*aMH_QN>&dFe!P-wcmYvN8O zu=0WleL5(?OFAKeL!4)CwcP)%@}y9ci<8mozZ%+Dr7RfHeb%(*qp`{Ug`Ew@Q=7l^ zBBtZvWmx2j{D}@YN{SHHVI`xd$jTkAUhA(a=*UWWYh9{SgJRyDD75z}40*4Ed&{ea z*x=+De3u+XRy?-6vFbA3P}vd_y8tZ}eDAjYl+lunX+UJRmNe3V;TuX#dMVX!?~ZQZ zj!8^-Tg`q3>s7Sm55UcvS)8y+NaiH#?M%J(ifk4GzUSf|vMgE!Qag9$Ed~)dQ&Qnp zCFSfFVs*mzTaz@~S=~uXrVNwqMWwsGxf{g3ayNiLkj;O-+g4rKJR+9(ytzrkm+ZHB zA}aCg0D=u)j5}7;QFC2B)5VM0e0>m3I%DXXM-0_K!U~Y!a4Vj8v|7gQZL5RM$@HDc zPRLSeIsK+KCtIBf4=dooUPf*JS4`8uI{T?d!%cl}@K<-1i=Qt&6|&wgXG=~(W}4(_ zYBkKMJ^awF>5zcim$|On{;a3R%oIFvO1R-i5r85QeFFuPmi48mJs8U0@0l<{D-ZNZ zK276084lS086EbXsdx6AU)%&!<+vfZli!<2i#mMp**8vb}nd^7NgCeOwry`z;PNXbXSlce=YCB&K zzFpW_bCt?iWZ3S|C{PVlE0Qa1|)0fHp`Bo(1l>`j8$lx72@n8Aqraq z`26$j8L`CF_NnJdFYGSR>N7oX$0osy+tZgFFY={@yd(iUrK5uU_9b1`D=aLj{r*X{pXKd;`%SDM zq6OakFWetZE^A?-K$%a)K0jH!=mmb1f-KO zh$uc-f^zKIkL+HWA{$A8$~I7wCruD+q3^c&&jHMp{^T0_u9$t7(rdE2_caVEHovG@ z1czV$0TVpz&EWz0ws`8cJgwG-$H4PX^yRX|7^GJo%)xABRa{z3xSZ4}RnN5SP23tb zqD(&%C?&8Wx?q)%^5;nh*x>SJJ=nTWg8=_q5Z& z(efo;I)uh|pLCBW?i=%zHZ;Uv88oP?M5Fw%?aLqg1}d@~Bcb7M>&O4ua7n|;_D|yM z{I1--f9+%hfI7dX)UI>Hea?zu6&r03=+VQ+pP74 zRA*=pT(gC>_jR9F;qu)SOygNR^-reWLx0z6lI!`u?^P_N=cnScE(;E|?1!1{LUO$V z7<`E6bm159O2Xskqwp?|B*bFrgad8O2(Rp?p>4^J>=%wIx26_dwK;0wkyNH+5`Kq3 z8)=aD9?NvgW@f>-p9#J22qj;bDAf@d3rrz);x8W*KzKAz%=N1dggGwnnZO7u5EIIan`uF>c<=S(8w}7?#2j!rQY9Ep-y%_Sc zi15*q%CU{;l|L&bX%hL2URBgTlcR_U_O-{_sRjn7AV1o-fndzOoWnQ|fNH3UYxJ&hRBVrdza7!C)^S0C>-@u_^k!|v4F_CjR9eLZl!w>DPrD2i3 z!ABb#4nO}eEFI>!o;rZ43;CQ`Abxgncjvp`r)x3uYU$3pToyD}5q9fn*%IB4uH<@8 z=9H;6Q@cB0KAJ;o z`!|MR^+lpW20BMd z00g@r{GQRn%A$_W%#GyP+q4)9!Y>^@+t$3-=Y2$cR;NK1$w`;x8rQ_2c+JNB&ChyS?mYU^!lf* z$Dce&g?nDGjb0>ez&J^BU-vddNP25*Qx~m<)SQ7XZWh2 z)dhhTpN9@%y{t((j19+rYWkxgm|Dj?7J~uJxU=_kcv$DaQF!6OahAEjtzh>7sddeM zKBw5W)~`844|s8yv;di~g0stIhjKKO%Ns-^9kezAXtxW0a^cQ_N@2>*wMux~Z^Vzai0JK__E)>6wjxB|vX zXBa$;R#dQS4u&%fN^D+#bk=1<|Au50(0}Z5wCbgCk zs3WUeL6vVeuJ@z+x!!XZA@isgWlTJNrJW!D6V`QBP~crv07vEro_hxLhf+}tI9K}v zvt>84Z`dSjmlwJ{6CuvqTbk@Wfzd9_;h~#QRVOdB7yz?rn^r_p?r77psNs(+EyEDO;Ta6O_Po`i)=K5|mwp&`pE<%cQV+e<}(!}b$*JYx*gLU(mNkSx~ZI?;`di$gTd3ENSs zo9!d_3uJJ^pj_9gMgts*vdRDiGfi!HN)yCB3$^S1tmiWsq47e!)=HE6bwV z4J>~%XyyV$XowGM&_^2_4>RxbZbt%;HH%KAm4XrZj5YsyV;Dqaad3$vY#Kq4t2;bn zon_7@`)NNwog#M`9GKArSSF7qd_*?yi-^j4oux$Fpv}GB-@VU!=@Q!cQ?}`xJ6%XY z#@VLoJzi!k|6fD4%NowT3N)8j-b%_g>=jN#xI6^M4p-Wl!Ycj*pZ;KL?%yYWzw!9a zW_2Y8dn}b^u4=R=_Eu2rDks$Wxt;CmcY?X|rB9CGv>ZPI6TSwAM~MF1sUVA8*XWzj zZGfW&!X*&m!fZR59P62wLtrdDF5RB(cs_yztaNtWYojAp*~npQO<>WiT9J%UP|Ok8 zo!>QnaNwKf0&j7l&>td8R<~E7flk&{GMB695W#LV4_ZGF;7bRoWdr5^64snnS!To=H*k=k`R(mfje6|~JULRav z{gdrJJSKyq@BEMi1S4$(3a6@DaJEN6omq-E8&nR}spk0Vm3c4KI1pC081IOof01@+ zW>s5RL}uQ)|3bVQi<>l#2^1m`#OK1Y< z+g6NqX~+&;GmciM4iDW6o_X$i#MDKq66kD=j8pbJU+Mia>QQNiJG8M**p7wtwE26@ z{k1=$Sm&w9zMC~dHMi^{4tNP}4aX{u`6X($7J$Q{jdupMxvaygi0blttU%W ziWR5B6iHr=Gjg2AwAE#nlG>|6zNJ=%xl>FufwfpVKE?NE&UgWnVx?uM@q~YgaN6Eb z3tZzVadUBh8l0xanAWGfRbb%L$coQQM-21=sOwgZu^?EQI}JS(C;{zk71q0MT}f;k z5BLLD8NL6Z?ID&AgO^r{w+7XG!7u)+C`T#hXk)1hAO98PfdEp6($%~{q*e7*ie`%)$GKD-icZb}b`h?mC0{9a ziNxnnxA?+guovP3uEk62=r z{YwnG(db150cX+`6%!?(8%+UJjdOH;SO5VUQzZ(e#rVm3eb1>g6*~y^r-Fvc?>Sh? z34KXBLyKvtOGJ54F`h}LM8BQ@>bv0gE&T*H%J)p1KeX|n@bn@&@XffTo_k s2|nhWYo!*Wf3C;cP7K=l?$6G@z3e(u;PUr{hdw)i3m4Hvrq>_-ACh|Zn*aa+ literal 0 HcmV?d00001 diff --git a/core/datacap-server/pom.xml b/core/datacap-server/pom.xml index 7bad66bd..fd7a8418 100644 --- a/core/datacap-server/pom.xml +++ b/core/datacap-server/pom.xml @@ -274,6 +274,11 @@ datacap-native-alioss ${project.version} + + io.edurt.datacap + datacap-native-kafka + ${project.version} + diff --git a/core/datacap-server/src/main/etc/conf/plugins/native/kafka.json b/core/datacap-server/src/main/etc/conf/plugins/native/kafka.json new file mode 100644 index 00000000..5f9c4db7 --- /dev/null +++ b/core/datacap-server/src/main/etc/conf/plugins/native/kafka.json @@ -0,0 +1,19 @@ +{ + "name": "Kafka", + "supportTime": "2023-03-06", + "configures": [ + { + "field": "name", + "type": "String", + "required": true, + "message": "name is a required field, please be sure to enter" + }, + { + "field": "host", + "type": "String", + "required": true, + "value": "127.0.0.1:9092", + "message": "host is a required field, please be sure to enter" + } + ] +} diff --git a/core/datacap-web/console-fe/public/static/images/plugin/Kafka.png b/core/datacap-web/console-fe/public/static/images/plugin/Kafka.png new file mode 100644 index 0000000000000000000000000000000000000000..72c6a29279bcc47bf94c16ee3f617c029b6bf910 GIT binary patch literal 2742 zcmV;n3Q6^eP)^8Wq%%abQhZiQ9(?Af!+r%#{k%ku{aoqmm`3ckZ{PkSTnRsZ{Agd|@R&kua<3Ju2Fdp){$RM+m@g|*O`d}P z!BDXg!iv*^U%q^?FMUW16Z_k@Z~vE4t@+@=gIkw>bPN+4*K9@EA~p5v*RO9)7ml6U znmqsT;lqD^c8hP`ys>ZLa2O&s%@Qld8fl&&?AEIM`Sa(ki1zokcytUI0mXe2dewTF zW{Q#$Xx1DVL&UZsSRx#&KLSF$y^iJc=g;l09SYsWRzf%pmK?lz@#3FrEy*Mz^dA-7 z#U_+m5iD~B!MxdPSyBZY72U=53>7YY%aD2C_w4m7kK9J>F1BaR=%|$dV`s0qL5NN0k9Fp!}VFy*gQYVTjlua(a~Ti?Ks^az1~uN|_AyXSaw5HGqJK zscx_G-kFdyV3^p7QL9B(e(9>__r31l9@BR1NDLDj)IL$V#nRfEND_%6dt{}L3hzV3 z1~gIZfYfB^36+Q^P?=M0)WC4D*}P~acxj$SXnW0CB&_!qe7M*NUPC0x+&%FG$~k+@ z+9WlUdWi_85gU+Onelc|B4hxgr(7(EHl^=FB|V2;m`ZHGKu1cOL`u0@+n3&EjV1S| zAsW$%loWmMy>3XHPV5B28_k!!@Q%?z7*=xbfd>nmvVEN`BJkR4^WG7VE};FLHS%*k zFs;}Lm>4@aiKTnrYfx)xwPyDA?-QBWtuejWdq$~tzGFoyojq0P1Z6LPNzodo*bD5v zR>^BJiM)*(YWdbU#a=+Et8;5(@6blcwN;#AFW|Xw%CTJRd5=VIhp!Z;*bD4A|I*jP z<$I41X!-l)%f`Fc)Xe z-T*V4Ri^f*X-^_l>0l{$g66)4(yVE{&pnOUoO@um=!`5|NaYlNxd`L~O;G)+Q@=DR}M4 z#5q z?Yyk`+_27M=}}AXp^Mm&a9d(pSb8NS=a|qL{DJ&r%y@pH@JmrmeLCl=EJRy&4)bpt;zpv8jC!L3Plj z_k`UgG#A?r?iOpWF!d`bdiAAeKvS{R)@X~pT=i=yTD{UYI8%`h=@Ra0#hqitv-1G)`FTo zVD!fNyNr4xyaDAdi^N*1Om*R~mO0}(iL;`)*veA}mMneZQ6!SU+%_!+koU?_E1E*& z{&d2W6vF#pqR%9LbFo46Y<6z`&OPH6i6{sdi5!s9a`eFk&_(Q=Lj|lB)uB_*tF6&3 zVkGyrSP`rAoH;9gWCN#**dVg6y(FSzq7Z^f!(j0zgHHrd8z(B)wmfs+`xaZ@hrNYv zVw1Z{grWIC?uXGR5hW8W9XyPB0F@)EfNo*~<*S%xA(N`ssnso(J_kLDu404eNz>XS z^hc_L*4*<6wb5N{5OwFZm~v(0R@&CY+4t(r4-p&GHa#BKOi_&}lOC3vPL>imq9x#( zDQtmZVgquPa&)MKUP(oHmt!2gFL=R_D$h}YOqd?cm!V<<23y^BQB~*!<)mxLGuBj0BQ{`#G2ndgXm}^Ak}Gq2 z-9Xtb%gwKfA#9EIuaF6H7;VtKRY9&x-{e;V%AF*+xmAf8d)}f3rWJb$J1B{@ziz?n zmX#czQjSTr{x*R-EjBs6W{v8VOp)C&3#DDWRvy;GDfR+hd-N2$1MHwA))w1a*(GU% z63{!3cZTsxKe3grUM>`;pHnVIkpJ#G!VXGeZLyV`8giQ-UW$MsGYqD1WONED!9{Pml&okD9sOu;9e7iYbl*TerD~FT(vQ+ z*tl0V5%zK&D~582FsZII1zw4?B$0skiSYEyF(Cao?Qr6@i zLAFqGY%TIRd#q|OmDoye+9QKkqgU?L+_l*A9NH9OW3m)u6k+b})DqE~0ix{r_-Qa) z?3U-jN^>Fat(D-do;I8~FqgBe>uGbQnKc$S?BU_}c z``>#QCU(yo1`w%6X@+a|_Pp0fiO6KB@g7PrOzg-_a)f$TCpGM{mW|}o()_&yqVawR z?^`7zpD*w72U<&I~oje&bfjpr$IaqaLwp0ws)IN z>75bwdX_b6?z@WaVtaKep|}K|QLD8xb{BDVVsF*GiMf8@8qi&AuSJSu&OK_RJS(;5 zmf~#aF1E5~9WA>OltTxnLF)(x+X`TryU*)2Yv{D)I{j-vcd>yLVU5uP^U5PLN}c{) z#}KhSlLj$b>P_!6J>6ht3=vyten7%5Iex20PDu9Z_9c&wVI$z>*Co?-AuE$roSp{k zm03$3eG>!>6PplSdh$ahlOngAyCw`18z^O14c0hXd<_^XHc-r336I<{vluQmh?uMh zPoe+D6k-E!>oY>UK?6`b^;| + + 4.0.0 + + io.edurt.datacap + datacap + 1.7.0-SNAPSHOT + ../../pom.xml + + + datacap-native-kafka + DataCap - Kafka + + + 2.8.0 + native-kafka + + + + + io.edurt.datacap + datacap-spi + provided + + + io.edurt.datacap + datacap-common + + + commons-beanutils + commons-beanutils + + + io.edurt.datacap + datacap-parser + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + + + + + maven-assembly-plugin + ${assembly-plugin.version} + + ${plugin.name} + + ../../configure/assembly/assembly-plugin.xml + + ../../dist/plugins/${plugin.name} + + + + make-assembly + package + + single + + + + + + + + diff --git a/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaAdapter.java b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaAdapter.java new file mode 100644 index 00000000..10bae61c --- /dev/null +++ b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaAdapter.java @@ -0,0 +1,147 @@ +package io.edurt.datacap.plugin.natived.kafka; + +import com.google.common.base.Preconditions; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.edurt.datacap.spi.adapter.NativeAdapter; +import io.edurt.datacap.spi.model.Configure; +import io.edurt.datacap.spi.model.Response; +import io.edurt.datacap.spi.model.Time; +import io.edurt.datacap.sql.SqlBase; +import io.edurt.datacap.sql.SqlBaseToken; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +@Slf4j +@SuppressFBWarnings(value = {"RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE", "REC_CATCH_EXCEPTION"}, + justification = "I prefer to suppress these FindBugs warnings") +public class KafkaAdapter + extends NativeAdapter +{ + protected KafkaConnection kafkaConnection; + private final KafkaParser parser; + + public KafkaAdapter(KafkaConnection kafkaConnection, KafkaParser parser) + { + super(kafkaConnection, parser); + this.kafkaConnection = kafkaConnection; + this.parser = parser; + } + + @Override + public Response handlerExecute(String content) + { + Time processorTime = new Time(); + processorTime.setStart(new Date().getTime()); + Response response = this.kafkaConnection.getResponse(); + Configure configure = this.kafkaConnection.getConfigure(); + if (response.getIsConnected()) { + List headers = new ArrayList<>(); + List types = new ArrayList<>(); + List columns = new ArrayList<>(); + try { + SqlBase sqlBase = this.parser.getSqlBase(); + if (sqlBase.isSuccessful()) { + AdminClient client = this.kafkaConnection.getClient(); + if (ObjectUtils.isNotEmpty(this.parser.getSqlBase().getColumns())) { + headers.addAll(this.parser.getSqlBase().getColumns()); + } + else { + headers.add("*"); + } + types.add("String"); + this.adapter(client, sqlBase) + .forEach(column -> columns.add(handlerFormatter(configure.getFormat(), headers, Collections.singletonList(column)))); + response.setIsSuccessful(Boolean.TRUE); + } + else { + Preconditions.checkArgument(!sqlBase.isSuccessful(), sqlBase.getMessage()); + } + } + catch (Exception ex) { + log.error("Execute content failed content {} exception ", content, ex); + response.setIsSuccessful(Boolean.FALSE); + response.setMessage(ex.getMessage()); + } + finally { + response.setHeaders(headers); + response.setTypes(types); + response.setColumns(columns); + } + } + processorTime.setEnd(new Date().getTime()); + response.setProcessor(processorTime); + return response; + } + + private List adapter(AdminClient client, SqlBase info) + { + List array = new ArrayList<>(); + if (info.getToken() == SqlBaseToken.SHOW) { + if (info.getChildToken() == SqlBaseToken.TOPICS) { + this.adapterShowTopics(client, array); + } + else if (info.getChildToken() == SqlBaseToken.CONSUMERS) { + this.adapterShowConsumers(client, info, array); + } + } + return array; + } + + private void adapterShowTopics(AdminClient client, List array) + { + try { + client.listTopics() + .listings() + .get() + .forEach(v -> array.add(v.name())); + } + catch (Exception e) { + Preconditions.checkArgument(false, ExceptionUtils.getMessage(e)); + } + } + + private void adapterShowConsumers(AdminClient client, SqlBase info, List array) + { + try { + if (StringUtils.isNotEmpty(info.getTable())) { + client.listConsumerGroups() + .all() + .get() + .parallelStream() + .forEach(v -> { + try { + DescribeConsumerGroupsResult describeConsumerGroupsResult = client.describeConsumerGroups(Collections.singleton(v.groupId())); + ConsumerGroupDescription consumerGroupDescription = describeConsumerGroupsResult.all().get().get(v.groupId()); + if (consumerGroupDescription.members().stream().anyMatch(member -> + member.assignment().topicPartitions().stream().anyMatch(tp -> + tp.topic().equals(info.getTable().replace("`", ""))))) { + array.add(v.groupId()); + } + } + catch (Exception e) { + Preconditions.checkArgument(false, ExceptionUtils.getMessage(e)); + } + }); + } + else { + client.listConsumerGroups() + .all() + .get() + .forEach(v -> array.add(v.groupId())); + } + } + catch (Exception e) { + Preconditions.checkArgument(false, ExceptionUtils.getMessage(e)); + } + } +} diff --git a/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaConnection.java b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaConnection.java new file mode 100644 index 00000000..ae312c9f --- /dev/null +++ b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaConnection.java @@ -0,0 +1,60 @@ +package io.edurt.datacap.plugin.natived.kafka; + +import io.edurt.datacap.spi.connection.Connection; +import io.edurt.datacap.spi.model.Configure; +import io.edurt.datacap.spi.model.Response; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; + +import java.util.Properties; + +@Slf4j +public class KafkaConnection + extends Connection +{ + private Configure configure; + private Response response; + + @Getter + private AdminClient client; + + public KafkaConnection(Configure configure, Response response) + { + super(configure, response); + } + + @Override + protected String formatJdbcUrl() + { + return null; + } + + @Override + protected java.sql.Connection openConnection() + { + try { + this.configure = getConfigure(); + this.response = getResponse(); + log.info("Connection url {}", formatJdbcUrl()); + Properties properties = new Properties(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, configure.getHost()); + this.client = AdminClient.create(properties); + response.setIsConnected(Boolean.TRUE); + } + catch (Exception ex) { + log.error("Connection failed ", ex); + response.setIsConnected(Boolean.FALSE); + response.setMessage(ex.getMessage()); + } + return null; + } + + @Override + public void destroy() + { + this.client.close(); + log.info("Connection close successful"); + } +} diff --git a/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaParser.java b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaParser.java new file mode 100644 index 00000000..2bf46863 --- /dev/null +++ b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaParser.java @@ -0,0 +1,27 @@ +package io.edurt.datacap.plugin.natived.kafka; + +import io.edurt.datacap.spi.parser.SqlParser; +import io.edurt.datacap.sql.SqlBase; +import io.edurt.datacap.sql.SqlBaseToken; + +public class KafkaParser + extends SqlParser +{ + public KafkaParser(String content) + { + super(content); + } + + @Override + public String getExecuteContext() + { + SqlBase sqlBase = this.getSqlBase(); + if (sqlBase.getToken() == SqlBaseToken.SHOW) { + return sqlBase.getTable(); + } + else if (sqlBase.getToken() == SqlBaseToken.SELECT) { + return sqlBase.getTable(); + } + return null; + } +} diff --git a/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaPlugin.java b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaPlugin.java new file mode 100644 index 00000000..fc4e7873 --- /dev/null +++ b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaPlugin.java @@ -0,0 +1,80 @@ +package io.edurt.datacap.plugin.natived.kafka; + +import io.edurt.datacap.spi.Plugin; +import io.edurt.datacap.spi.PluginType; +import io.edurt.datacap.spi.adapter.Adapter; +import io.edurt.datacap.spi.connection.JdbcConfigure; +import io.edurt.datacap.spi.model.Configure; +import io.edurt.datacap.spi.model.Response; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.beanutils.BeanUtils; +import org.apache.commons.lang3.ObjectUtils; + +@Slf4j +public class KafkaPlugin + implements Plugin +{ + private Configure configure; + private KafkaConnection connection; + private Response response; + + @Override + public String validator() + { + return "SHOW TOPICS"; + } + + @Override + public String name() + { + return "Kafka"; + } + + @Override + public String description() + { + return "Integrate Kafka data sources"; + } + + @Override + public PluginType type() + { + return PluginType.NATIVE; + } + + @Override + public void connect(Configure configure) + { + try { + this.response = new Response(); + this.configure = new JdbcConfigure(); + BeanUtils.copyProperties(this.configure, configure); + this.connection = new KafkaConnection(this.configure, this.response); + } + catch (Exception ex) { + this.response.setIsConnected(Boolean.FALSE); + this.response.setMessage(ex.getMessage()); + } + } + + @Override + public Response execute(String content) + { + if (ObjectUtils.isNotEmpty(this.connection)) { + log.info("Execute kafka plugin logic started"); + this.response = this.connection.getResponse(); + Adapter processor = new KafkaAdapter(this.connection, new KafkaParser(content)); + this.response = processor.handlerExecute(content); + log.info("Execute kafka plugin logic end"); + } + return this.response; + } + + @Override + public void destroy() + { + if (ObjectUtils.isNotEmpty(this.connection)) { + this.connection.destroy(); + } + } +} diff --git a/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginModule.java b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginModule.java new file mode 100644 index 00000000..73aba34b --- /dev/null +++ b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginModule.java @@ -0,0 +1,38 @@ +package io.edurt.datacap.plugin.natived.kafka; + +import com.google.inject.multibindings.Multibinder; +import io.edurt.datacap.spi.AbstractPluginModule; +import io.edurt.datacap.spi.Plugin; +import io.edurt.datacap.spi.PluginModule; +import io.edurt.datacap.spi.PluginType; + +public class KafkaPluginModule + extends AbstractPluginModule + implements PluginModule +{ + @Override + public String getName() + { + return "Kafka"; + } + + @Override + public PluginType getType() + { + return PluginType.NATIVE; + } + + @Override + public AbstractPluginModule get() + { + return this; + } + + protected void configure() + { + Multibinder module = Multibinder.newSetBinder(this.binder(), String.class); + module.addBinding().toInstance(this.getClass().getSimpleName()); + Multibinder plugin = Multibinder.newSetBinder(this.binder(), Plugin.class); + plugin.addBinding().to(KafkaPlugin.class); + } +} diff --git a/plugin/datacap-native-kafka/src/main/resources/META-INF/services/io.edurt.datacap.spi.PluginModule b/plugin/datacap-native-kafka/src/main/resources/META-INF/services/io.edurt.datacap.spi.PluginModule new file mode 100644 index 00000000..00dadf17 --- /dev/null +++ b/plugin/datacap-native-kafka/src/main/resources/META-INF/services/io.edurt.datacap.spi.PluginModule @@ -0,0 +1 @@ +io.edurt.datacap.plugin.natived.kafka.KafkaPluginModule diff --git a/plugin/datacap-native-kafka/src/test/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginModuleTest.java b/plugin/datacap-native-kafka/src/test/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginModuleTest.java new file mode 100644 index 00000000..0ca4d8ef --- /dev/null +++ b/plugin/datacap-native-kafka/src/test/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginModuleTest.java @@ -0,0 +1,30 @@ +package io.edurt.datacap.plugin.natived.kafka; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.TypeLiteral; +import io.edurt.datacap.spi.Plugin; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Set; + +public class KafkaPluginModuleTest +{ + private Injector injector; + + @Before + public void before() + { + this.injector = Guice.createInjector(new KafkaPluginModule()); + } + + @Test + public void test() + { + Set plugins = injector.getInstance(Key.get(new TypeLiteral>() {})); + Assert.assertTrue(plugins.size() > 0); + } +} \ No newline at end of file diff --git a/plugin/datacap-native-kafka/src/test/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginTest.java b/plugin/datacap-native-kafka/src/test/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginTest.java new file mode 100644 index 00000000..c089963b --- /dev/null +++ b/plugin/datacap-native-kafka/src/test/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginTest.java @@ -0,0 +1,43 @@ +package io.edurt.datacap.plugin.natived.kafka; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.TypeLiteral; +import io.edurt.datacap.spi.Plugin; +import io.edurt.datacap.spi.model.Configure; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Optional; +import java.util.Set; + +public class KafkaPluginTest +{ + private Injector injector; + private Configure configure; + + @Before + public void before() + { + injector = Guice.createInjector(new KafkaPluginModule()); + configure = new Configure(); + configure.setHost("localhost:9092"); + } + + @Test + public void test() + { + Set plugins = injector.getInstance(Key.get(new TypeLiteral>() {})); + Optional pluginOptional = plugins.stream() + .filter(v -> v.name().equalsIgnoreCase("Kafka")) + .findFirst(); + if (pluginOptional.isPresent()) { + Plugin plugin = pluginOptional.get(); + plugin.connect(configure); + Assert.assertNotNull(plugin.execute(plugin.validator()).getConnection()); + plugin.destroy(); + } + } +} diff --git a/pom.xml b/pom.xml index ae9069b8..82f57069 100644 --- a/pom.xml +++ b/pom.xml @@ -18,6 +18,7 @@ plugin/datacap-native-alioss plugin/datacap-native-zookeeper plugin/datacap-native-redis + plugin/datacap-native-kafka plugin/datacap-http-cratedb plugin/datacap-http-clickhouse plugin/datacap-jdbc-clickhouse