Bas Geerdink | September 25, 2019 | Strata New York
{
"name": "Bas Geerdink",
"role": "Technology Lead",
"background": ["Artificial Intelligence",
"Informatics"],
"mixins": ["Software engineering",
"Architecture",
"Management",
"Innovation"],
"twitter": "@bgeerdink",
"linked_in": "bgeerdink"
}
| Sector | Data source | Pattern | Notification |
|---|---|---|---|
| Marketing | Tweets | Trend analysis | Actionable insights |
| Finance | Payment data | Fraud detection | Block money transfer |
| Insurance | Page visits | Customer is stuck in form | Chat window |
| Healthcare | Patient data | Heart failure | Alert doctor |
| Traffic | Cars passing | Traffic jam | Update route info |
| Internet of Things | Machine logs | System failure | Alert to sys admin |
| Gaming | Player actions | Key combos | Action on screen |
spark.executor.cores=2, spark.task.cpus=1
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("Styx")
// connect to Spark
val spark = SparkSession
.builder
.config(conf)
.getOrCreate()
import spark.sqlContext.implicits._
// get the data from Kafka: subscribe to topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "tweets")
.option("startingOffsets", "earliest")
.load()

Example: sliding window of 1 day, evaluated every 15 minutes over the field 'customer_id'. The event time is stored in the field 'transaction_time'
val windowedTransactions = transactionStream
.groupBy(
window($"transaction_time", "1 day", "15 minutes"),
$"customer_id")
Example: sliding window of 60 seconds, evaluated every 30 seconds. The watermark is set at 1 second, giving all events a second of event time to arrive
val windowedTweets = tweetStream
.withWatermark("created_at", "1 second")
.groupBy(
window($"created_at", "60 seconds", "30 seconds"),
$"word")
from sklearn.linear_model import LogisticRegression
from sklearn2pmml import sklearn2pmml
events_df = pandas.read_csv("events.csv")
pipeline = PMMLPipeline(...)
pipeline.fit(events_df, events_df["notifications"])
sklearn2pmml(pipeline, "LogisticRegression.pmml", with_repr = True)
def score(event: RichBusinessEvent, pmmlModel: PmmlModel): Double = {
val arguments = new util.LinkedHashMap[FieldName, FieldValue]
for (inputField: InputField <- pmmlModel.getInputFields.asScala) {
arguments.put(inputField.getField.getName,
inputField.prepare(customer.all(fieldName.getValue)))
}
// return the notification with a relevancy score
val results = pmmlModel.evaluate(arguments)
pmmlModel.getTargetFields.asScala.headOption match {
case Some(targetField) =>
val targetFieldValue = results.get(targetField.getName)
case _ => throw new Exception("No valid target")
}
}
}
Please rate my session on the website or app :)
Read more about streaming analytics at:
Source code and presentation are available at: