I am trying to read data from a kafka topic to spark. i am running kafka on docker desktop. this is my pom-
<groupId>com.dell</groupId>
<artifactId>sparkdemo2</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>1.11.1026</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-dell</artifactId>
<version>1.4.3</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.thrift</groupId>-->
<!-- <artifactId>libthrift</artifactId>-->
<!-- <version>1.4.3</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.emc.ecs</groupId>
<artifactId>object-client-bundle</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-3.5_2.12</artifactId>
<version>1.4.3</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.13</artifactId>
<version>3.3.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M7</version>
<configuration>
<argLine>--add-exports java.base/sun.nio.ch=ALL-UNNAMED</argLine>
</configuration>
</plugin>
</plugins>
</build>
And this is my code
package com.dell;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class Main {
public static void main(String[] args)
{
// Define Kafka parameters
SparkSession spark = SparkSession.builder().appName("Kafka Spark Demo").master("local[*]").getOrCreate();
String kafkaBrokers= "localhost:9092";
String kafkaTopic = "Topic_Project";
Dataset<Row> kafkaStreamDf = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "earliest")
.load();
kafkaStreamDf.show(5);
}
}
I keep getting this error-
Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide". at org.apache.spark.sql.errors.QueryCompilationErrors$.failedToFindKafkaDataSourceError
I can't seem to figure out the issue here. Any ideas are helpful.
Answers
The error message indicates that Spark is unable to find the Kafka data source, which is required for reading data from Kafka topics. This issue typically occurs when the necessary dependencies for Kafka integration are not available or not properly configured.
Here are a few steps you can take to troubleshoot and resolve this issue:
-
Check Dependencies: Ensure that you have added the
spark-sql-kafka-0-10
dependency in yourpom.xml
file. You have already added it, so it seems the dependency is correct. -
Check Spark Version: Make sure that the version of
spark-sql-kafka-0-10
dependency matches the version of Spark you are using. In your case, you are using Spark 3.3.2, sospark-sql-kafka-0-10
version3.3.2
should be compatible. -
Package Assembly: If you are running your application in a standalone mode (not in an environment managed by an orchestration tool like YARN or Kubernetes), ensure that you are creating a fat JAR (also known as Uber JAR or assembly JAR) that includes all the required dependencies. You can use the Maven Assembly Plugin or Maven Shade Plugin to create such a JAR.
-
Check Classpath: If you are running your application using
spark-submit
, make sure that the fat JAR containing all dependencies is included in the classpath. -
Check Kafka Connection: Ensure that your Kafka broker is running and accessible from your Spark application. Verify the
kafka.bootstrap.servers
configuration property to ensure it points to the correct Kafka broker(s). -
Verify Configuration: Double-check the configuration options passed to the
readStream()
method. Ensure that the Kafka-related options (kafka.bootstrap.servers
,subscribe
,startingOffsets
, etc.) are correctly specified. -
Check Spark Logs: Check the Spark application logs for any additional error messages or warnings that may provide insights into the issue. Look for any exceptions related to Kafka data source initialization.
By following these steps and ensuring that your dependencies are correctly configured, you should be able to resolve the issue and successfully read data from Kafka topics using Spark Structured Streaming.