Databricks Example
Because Databricks functions as an external system, you can develop their own business logic using Spark within it. In this context, Macrometa serves as both a source (for taking inputs) and a target (for persisting outputs) from the perspective of Databricks.
This example shows you how to read data from a Macrometa stream and write data to a Macrometa collection using the Macrometa Databricks Client Connector.
Read from a Macrometa Stream
- Set up your source options: - val sourceOptions = Map(
 "regionUrl" -> regionUrl,
 "port" -> "6651",
 "apikey" -> apikey,
 "fabric" -> fabric,
 "tenant" -> tenant,
 "replication" -> replication,
 "stream" -> sourceStream,
 "subscriptionName" -> sourceSubscription
 )
- Create a Spark session: - val spark = SparkSession.builder()
 .appName("MacrometaCollectionStreamingApp")
 .master("local[*]")
 .getOrCreate()
- Read from the Macrometa stream: - val inputStream = spark.readStream
 .format("com.macrometa.spark.stream.MacrometaTableProvider")
 .options(sourceOptions)
 .load()
Write to a Macrometa Collection
- Set up your target options: - val targetOptions = Map(
 "regionUrl" -> regionUrl,
 "apiKey" -> "apikey ",
 "fabric" -> fabric,
 "collection" -> "<YOUR_TARGET_COLLECTION>",
 "batchSize" -> "100"
 )
- Write to the Macrometa collection: 
  val query = inputStream.writeStream
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    batchDF.write
      .format("com.macrometa.spark.collection.MacrometaTableProvider")
      .options(targetOptions)
      .mode(SaveMode.Append)
      .save()
  }
  .option("checkpointLocation", "checkpoint")
  .start()
- Close the Spark session:
query.awaitTermination()