Bas Geerdink | December 13, 2019 | Functional Scala
{
"name": "Bas Geerdink",
"role": "Technology Lead",
"background": ["Artificial Intelligence",
"Informatics"],
"mixins": ["Software engineering",
"Architecture",
"Management",
"Innovation"],
"twitter": "@bgeerdink",
"linked_in": "bgeerdink"
}
Sector | Data source | Pattern | Notification |
---|---|---|---|
Finance | Payment data | Fraud detection | Block money transfer |
Finance | Clicks and page visits | Trend analysis | Actionable insights |
Insurance | Page visits | Customer is stuck in a web form | Chat window |
Healthcare | Patient data | Heart failure | Alert doctor |
Traffic | Cars passing | Traffic jam | Update route info |
Internet of Things | Machine logs | System failure | Alert to sys admin |
Spark Structured Streaming utilizes Spark SQL Datasets and DataFrames, which no longer inherit from the 'batch-driven' RDD
val conf = new SparkConf()
.setMaster("local[8]")
.setAppName("FraudNumberOfTransactions")
./bin/spark-submit --name "LowMoneyAlert" --master local[4]
--conf "spark.dynamicAllocation.enabled=true"
--conf "spark.dynamicAllocation.maxExecutors=2" styx.jar
// connect to Spark
val spark = SparkSession
.builder
.config(conf)
.getOrCreate()
// for using DataFrames
import spark.sqlContext.implicits._
// get the data from Kafka: subscribe to topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "transactions")
.option("startingOffsets", "latest")
.load()
// split up a tweet in separate words
val tweetStream = df
.map(_.getString(1)) // get value
.map(Tweet.fromString)
.filter(_.isDefined).map(_.get)
// create multiple TweetWord objects from 1 tweet
.flatMap(tweet => {
val words = tweet.text
.toLowerCase()
.split("[ \\t]+") // get words from tweet
words.map(word => TweetWord(tweet.created_at, word))
})
.filter(tw => !wordsToIgnore.contains(tw.word)
&& tw.word.length >= minimumWordLength)
Example: sliding window of 1 hour, evaluated every 15 minutes. The event time is stored in the field 'created_at'. Aggregation is done by counting the words per group, per window.
// aggregate, produces a sql.DataFrame
val windowedTransactions = tweetStream
.groupBy(
window($"created_at", "1 hour", "15 minutes"),
$"word")
.agg(count("word") as "count", $"word", $"window.end")
Example: sliding window of 60 seconds, evaluated every 30 seconds. The watermark is set at 1 second, giving all events some time to arrive.
// aggregate, produces a sql.DataFrame
val windowedTransactions = tweetStream
.withWatermark("created_at", "1 minute") // <--- watermark
.groupBy(
window($"created_at", "1 hour", "15 minutes"),
$"word")
.agg(count("word") as "count", $"word", $"window.end")
from sklearn.linear_model import LogisticRegression
from sklearn2pmml import sklearn2pmml
events_df = pandas.read_csv("events.csv")
pipeline = PMMLPipeline(...)
pipeline.fit(events_df, events_df["notifications"])
sklearn2pmml(pipeline, "LogisticRegression.pmml", with_repr = True)
def score(event: RichBusinessEvent, pmmlModel: PmmlModel): Double = {
val arguments = new util.LinkedHashMap[FieldName, FieldValue]
for (inputField: InputField <- pmmlModel.getInputFields.asScala) {
arguments.put(inputField.getField.getName,
inputField.prepare(customer.all(fieldName.getValue)))
}
// return the notification with a relevancy score
val results = pmmlModel.evaluate(arguments)
pmmlModel.getTargetFields.asScala.headOption match {
case Some(targetField) =>
val targetFieldValue = results.get(targetField.getName)
case _ => throw new Exception("No valid target")
}
}
}
Read more about streaming analytics at:
Source code and presentation are available at: