Ifølge fejlen har du allerede en streng (du gjorde allerede df.selectExpr("CAST(value AS STRING)")
), så du bør prøve at få Row-begivenheden som en String
, og ikke en Array[Byte]
Start med at ændre
val valueStr = new String(record.getAs[Array[Byte]]("value"))
til
val valueStr = record.getAs[String]("value")
Jeg forstår, at du muligvis allerede har en klynge til at køre Spark-kode, men jeg vil foreslå, at du stadig kigger på Kafka Connect Mongo Sink Connector så du ikke behøver at skrive og vedligeholde din egen Mongo-skribent i Spark-kode.
Eller du kan skrive Spark-datasæt til mongo også direkte