sql >> Database teknologi >  >> NoSQL >> MongoDB

Sænk Kafka Stream til MongoDB ved hjælp af PySpark Structured Streaming

Jeg fandt en løsning. Da jeg ikke kunne finde den rigtige Mongo-driver til Structured Streaming, arbejdede jeg på en anden løsning. Nu bruger jeg den direkte forbindelse til mongoDb og bruger "foreach(...)" i stedet for foreachbatch(. ..). Min kode ser sådan ud i filen testSpark.py:

....
import pymongo
from pymongo import MongoClient

local_url = "mongodb://localhost:27017"


def write_machine_df_mongo(target_df):

    cluster = MongoClient(local_url)
    db = cluster["test_db"]
    collection = db.test1

    post = {
            "machine_id": target_df.machine_id,
            "proc_type": target_df.proc_type,
            "sensor1_id": target_df.sensor1_id,
            "sensor2_id": target_df.sensor2_id,
            "time": target_df.time,
            "sensor1_val": target_df.sensor1_val,
            "sensor2_val": target_df.sensor2_val,
            }

    collection.insert_one(post)

machine_df.writeStream\
    .outputMode("append")\
    .foreach(write_machine_df_mongo)\
    .start()



  1. Kom godt i gang med Redis Client API'er

  2. Brug af ekstern mongodb med meteorudvikling

  3. Tilfældige dokumenter fra MongoDB ved hjælp af spring-data

  4. docker-compose for at køre django med mongodb