Hi Tech Enthusiast! Pada Tech Monday Updates kali ini, kita bakal lanjutin dari artikel sebelumnya ya soal mengintegrasikan data dengan Kafka. Belum belum baca tentang Pengenalan Kafka? Kamu bisa baca di sini dulu, ya! Nah, kalau kemarin kita sudah berkenalan dengan Kafka, sekarang kita bakal bahas lebih lanjut tentang bagaimana mengintegrasikan data menggunakan Apache Kafka.
Seperti yang sudah kamu ketahui pada artikel sebelumnya, untuk bisa mengakses Kafka ini, kamu bisa akses melalui platform Confluent (www.confluent.io). Di dalam platform Confluent ini, ada banyak produk yang sangat berguna untuk untuk meningkatkan sebuah sistem aplikasi, salah satunya adalah Apache Kafka.
Ada 2 produk utama Kafka dalam platform ini, yaitu Kafka Brokers dan Kafka Clients. Pada Kafka Client, terdapat empat API, yaitu:
- Producer API: publish stream records ke satu atu lebih Kafka topik
- Consumer API: subscribe Kafka topik
- Stream API: transforming the input streams to output streams
- Connect API: integrasi data antara Kafka dengan sistem data lainnya
Kali ini kita akan fokus membahas cara integrasi data menggunakan Apache Kafka Client: Connect API!
Apache Kafka Client: Connect API (Kafka Connect)
Kafka Connect adalah API untuk integrasi data antara Kafka dengan sistem data lainnya. Contoh sistem data yang dimaksud adalah database RDMBS mysql, nosql mongodb, elasticsearch, big data, hadoop, dan lain sebagainya. Dengan API, membuat integrasi dengan Kafka menjadi lebih mudah bahkan dengan data yang sangat banyak. Kafka Connect mampu mengeksplorasi entire database atau per record ke dalam topik Kafka dan memungkinkan melakukan streaming data dengan latency yang sangat rendah.
Oh iya, Kafka Connect ini gratis lho! Hal ini karena open-source dari Apache Kafka® yang mana berfungsi sebagai data sentral untuk integrasi dengan sistem database dan filesystem.
Lantas, apa aja sih keuntungan memakai Kafka Connect ini?
- Data Centric Pipeline: Connect sangat berguna untuk abstraksi data untuk pull atau push data ke Kafka.
- Flexibility & Scalability: Connect berjalan dengan streaming atau sistem batch-oriented pada satu Node (standalone) atau banyak Node (distributed).
- Reusability & Extensibility: Connect memanfaatkan konektor yang ada atau bisa di-extend di kostum sesuai kebutuhan.
Selain itu, Kafka Connect ini bersifat fault-tolerant, jadi pada saat transmisi data, meskipun ada beberapa data yang gagal diterima, pesan tetap bisa diterima secara utuh. Dengan API Kafka Connect ini, operasional integrasi database jadi lebih gampang.
Ada 2 tipe konektor dari Kafka Connect:
- Source Connector — meng-export entire database atau stream insert/update/delete tabel ke topik Kafka. Konektor ini juga bisa collects metrics dari aplikasi server dan menyimpannya dalam topik Kafka.
- Sink connector — menerima data dari Kafka topic dan mengirim data ke sistem database nosql, big data, hadoop, dan lain-lain.
Baca juga: Cara Membuat REST API dengan Node.js
Konektor dan task adalah unit yang bekerja dan berjalan sebagai proses. Dalam Kafka Connect hal ini disebut worker. Ada 2 model worker: standalone mode dan distributed mode. Standalone mode hanya untuk environment development dan testing. Distributed mode untuk environment production.
- Standalone mode sangat berguna untuk development dan testing Kafka Connect di local. Ini juga berguna jika digunakan untuk production yang hanya mempunyai satu agen (contoh: kirim logs web server ke Kafka).
- Distributed mode worker pada banyak server, atau diistilahkan Connect Cluster. Kafka Connect berjalan secara terdistribusi across cluster. Kamu bisa menambah dan mengurangi Node server sesuai kebutuhan. Distributed mode juga lebih fault tolerant. Kalau 1 Node mati di dalam cluster, maka Kafka Connect secara otomatis mendistribusikan ke Node. Dan karena Kafka Connect menyimpan konfigurasi, status dan informasi offset di dalam Kafka cluster yang sudah direplikasi, maka jika salah satu node mati gak bakalan menghilangkan data.
Selanjutnya kita coba running worker dengan Standalone Mode, ya!
Konfigurasi dan Running Worker
Contoh command untuk menjalankan Kafka Connect:
bin/connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties ...]
- properties
adalah worker configuration properties file.
worker.propertieshanyalah contoh nama, anda bisa menggantinya dengan apapun. Dengan file ini anda bisa mengontrol seperti Kafka cluster yang digunakan dan format serialization.
- properties[…]
adalah konfigurasi konektor. Semua konektor mempunyai properties konfigurasi yang telah ada didalam worker. Anda bisa menjalankan berbagai macam konektor menggunakan command ini. Anda bisa melihat contoh properties konektor di
Kalau kamu menjalankan beberapa standalone workers dalam satu host server, maka 2 properties konfigurasi ini harus unik setiap worker:
- storage.file.filename:
nama storage file untuk offsets dari konektor.
- port:
port REST interface untuk listen HTTP request.
Command untuk mengetahui list konektor yang tersedia:
Output
Untuk tutorial ini, kita coba memakai Mysql sebagai sumber data, ya. Konektor Mysql yang dipakai adalah Debezium. Secara default, konektor ini meng-export data dalam database dan dimasukkan ke output topik. Proses Insert/Update/Delete akan tercatat sebagai message baru pada output topik.
Prerequisites
- mysql 8
Tutorial
- Instal konektor:
confluent-hub install debezium/debezium-connector-mysql:latest
- Menambah sebuah konektor baru harus restart Kafka Connect. gunakan Confluent CLI untuk restart Kafka Connect:
confluent local services connect stop && confluent local services connect start
- Cek Mysql plugin terinstall dengan benar dengan cara:
curl -sS localhost:8083/connector-plugins | grep mysql
- Buat file register-mysql.json
dengan isi sebagai berikut:
{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "localhost", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "schema-changes.inventory" } }
- Start The Connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
- Start Kafka Consumer
confluent local services kafka consume dbserver1.inventory.customers --from-beginning
Nah, itu tadi cara mengintegrasikan data dengan Apache Kafka. Artikel ini juga merupakan ringkasan dari online class minggu lalu bersama saya. Kalau di antara dari kamu punya pengalaman yang mau di-share, atau sekadar kasih feedback, boleh tulis di kolom komentar, ya. Selamat mencoba mengintegrasikan data!
Comments ( 0 )