I have not used spark for long days. Today I tried to setup spark to integrate with kafka streaming, but I found the ecosystem has changed a lot.
The main branches for spark and kafka are these:
- spark-3.3.0-bin-hadoop3.tgz (spark 3.3)
- kafka_2.12-3.2.0.tgz (kafka 3.2)
Though scala 2.13 has been released for long days, but both spark and kafka were developed mainly by scala 2.12. When downloading the software, we should choose the version who uses scala 2.12.
Firstly, we want to install scala 2.12 in the system. My system is ubuntu 20.04, x64. Following the steps below to install java ,sdkman, scala and sbt:
$ sudo apt install openjdk-11-jre
$ curl -s "https://get.sdkman.io" | bash
$ sdk install scala 2.12.15
$ sdk install sbt
After then, download spark and kafka from the links above. Untar the packages, and move them to /opt directory. So in /opt dir I have:
$ ls /opt
kafka spark
Then put these settings in .bash_profile file in user’s home dir:
source "/home/pyh/.sdkman/bin/sdkman-init.sh"
export SPARK_HOME=/opt/spark
export JAVA_HOME=/usr
export PATH=/opt/kafka/bin:/opt/spark/bin:$PATH
I have a script “kafka.sh” for managing the kafka service, whose content is as follow:
#!/bin/bash
ACT=$1
TOP=$2
PRE="/opt/kafka"
if [ -z $ACT ];then
echo "$0 action [topic]"
exit
fi
if [ -z $TOP ];then
TOP="quickstart-events"
fi
if [ "$ACT" == "produce" ];then
$PRE/bin/kafka-console-producer.sh --topic $TOP --bootstrap-server localhost:9092
elif [ "$ACT" == "consume" ];then
$PRE/bin/kafka-console-consumer.sh --topic $TOP --from-beginning --bootstrap-server localhost:9092
elif [ "$ACT" == "create" ];then
$PRE/bin/kafka-topics.sh --create --partitions 2 --replication-factor 1 --topic $TOP --bootstrap-server localhost:9092
elif [ "$ACT" == "desc" ];then
$PRE/bin/kafka-topics.sh --describe --topic $TOP --bootstrap-server localhost:9092
elif [ "$ACT" == "startzk" ];then
$PRE/bin/zookeeper-server-start.sh $PRE/config/zookeeper.properties
elif [ "$ACT" == "start" ];then
$PRE/bin/kafka-server-start.sh $PRE/config/server.properties
fi
So, I use this script to start a kafka process and create a topic:
$ kafka.sh startzk # startup zookeeper
$ kafka.sh start # startup kafka
$ kafka.sh create mytest # create a topic
These three commands should be run in three separated terminals. The last step is to create a topic named as “mytest”.
Now, I produce the messages to kafka by a ruby script:
$ cat produce.rb
require 'kafka'
kafka = Kafka.new("localhost:9092", client_id: "ruby-client", resolve_seed_brokers: true)
producer = kafka.producer(required_acks: :all,max_buffer_size: 50_000)
1000.times do
message = rand.to_s
producer.produce(message, key: "key1", topic: "mytest")
end
producer.deliver_messages
To keep publishing messages continuously, we can do it in bash shell:
$ while [ 1 ];do ruby produce.rb ;sleep 5;done
For now the messages have been published to kafka successfully. I have to read the streaming from kafka into spark and calculate the messages from within spark.
To setup a scala project:
$ mkdir myproject
$ cd myproject
$ mkdir -p src/main/scala
$ touch build.sbt
$ touch src/main/scala/sparkafka.scala
Here is the content of build.sbt:
name := "sparkafka"
version := "0.1"
scalaVersion := "2.12.15"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.12" % "3.3.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.3.0"
And the source code in sparkafka.scala:
import org.apache.spark.sql.SparkSession
object Sparkafka {
def main(args:Array[String]):Unit = {
val spark = SparkSession.builder.appName("Mykafka").getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "mytest")
.load()
import spark.implicits._
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
val myCount = df.groupBy("key").count()
val query = myCount.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}
In myproject directory, I run this command to compile and package the project:
$ sbt package
...
[success] Total time: 4 s, completed Jul 7, 2022, 4:39:19 PM
Go to spark’s configuration dir, change the log level in log4j2.properties to error:
$ cat /opt/spark/conf/log4j2.properties |grep error
rootLogger.level = error
The last step, submit the job to spark. In myproject dir, run the command below:
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 --class "Sparkafka" --master local[2] target/scala-2.12/sparkafka_2.12-0.1.jar
Here is the output in terminal:
-------------------------------------------
Batch: 1
-------------------------------------------
+-------------+-----+
| key|count|
+-------------+-----+
|[6B 65 79 31]| 2000|
+-------------+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+-------------+-----+
| key|count|
+-------------+-----+
|[6B 65 79 31]| 4000|
+-------------+-----+
-------------------------------------------
Batch: 3
-------------------------------------------
+-------------+-----+
| key|count|
+-------------+-----+
|[6B 65 79 31]| 5000|
+-------------+-----+
-------------------------------------------
Batch: 4
-------------------------------------------
+-------------+-----+
| key|count|
+-------------+-----+
|[6B 65 79 31]| 7000|
+-------------+-----+
Finally we got the job run correctly. For production deployment, we read the streaming from kafka, after aggregation we would write the results into a storage such as redis or mysql, instead of writing to terminal as in this sample.