Bas Geerdink | July 17, 2019 | O'Reilly OSCon
{
"name": "Bas Geerdink",
"role": "Technology Lead",
"company": "ING Bank",
"background": ["Artificial Intelligence",
"Informatics"],
"mixins": ["Software engineering",
"Architecture",
"Management",
"Innovation"],
"twitter": "@bgeerdink",
"linked_in": "bgeerdink"
}
Sector | Data source | Pattern | Notification |
---|---|---|---|
Marketing | Tweets | Trend analysis | Actionable insights |
Finance | Payment data | Fraud detection | Block money transfer |
Insurance | Page visits | Customer is stuck in form | Chat window |
Healthcare | Patient data | Heart failure | Alert doctor |
Traffic | Cars passing | Traffic jam | Update route info |
Internet of Things | Machine logs | System failure | Alert to sys admin |
Gaming | Player actions | Key combos | Action on screen |
def getData(env: StreamExecutionEnvironment): DataStream[MyEvent] = {
// create schema for deserialization
val readSchema = KafkaSchemaFactory.createKeyedDeserializer
(readTopicDef, rawEventFromPayload)
// consume events
val rawEventSource = new FlinkKafkaConsumer010[BaseKafkaEvent]
(toStringList(readTopicDef), readSchema, props)
// connect to source and handle events
env.addSource(rawEventSource)
.filter(_.isSuccess) // deserialize must succeed
.flatMap(event => doSomething(event))
.addSink(writeToOutputStream)
}
// in: (word, count)
// out: (timestamp of end of window, word, count), key (word), window
class WordCountWindowFunction extends
WindowFunction[(String, Int), WordCount, String, TimeWindow] {
def apply(key: String,
window: TimeWindow,
input: Iterable[(String, Int)],
out: Collector[WordCount]): Unit = {
val count = input.count(_ => true)
out.collect(WordCount(window.getEnd, key, count))
}
}
env.addSource(rawEventSource)
.filter(_.isSuccess) // deserialize must succeed
.assignTimestampsAndWatermarks(
new TimedEventWatermarkExtractor)
.flatMap(event => doSomething(event))
.addSink(writeToOutputStream)
class TimedEventWatermarkExtractor
extends AssignerWithPeriodicWatermarks[TimedEvent]() {
// specify the event time
override def extractTimestamp(element: TimedEvent,
previousElementTimestamp: Long): Long = {
// set event time to 'eventTime' field in events
element.eventTime.getMillis
}
override def getCurrentWatermark: Watermark = ???
}
class TimedEventWatermarkExtractor
extends AssignerWithPeriodicWatermarks[TimedEvent]() {
// specify the event time
override def extractTimestamp(element: TimedEvent,
previousElementTimestamp: Long): Long = {
// set event time to 'eventTime' field in events
element.eventTime.getMillis
}
// this method is called to emit a watermark every time the
// ExecutionConfig.setAutoWatermarkInterval(...) interval occurs
override def getCurrentWatermark: Watermark = {
// one second delay for processing of window
new Watermark(System.currentTimeMillis() - 1000)
}
}
env.enableCheckPointing(10000) // checkpoint every 10 seconds
# Supported backends: jobmanager, filesystem, rocksdb
#
state.backend: filesystem
from sklearn.linear_model import LogisticRegression
from sklearn2pmml import sklearn2pmml
events_df = pandas.read_csv("events.csv")
pipeline = PMMLPipeline(...)
pipeline.fit(events_df, events_df["notifications"])
sklearn2pmml(pipeline, "LogisticRegression.pmml", with_repr = True)
def score(event: RichBusinessEvent, pmmlModel: PmmlModel): Double = {
val arguments = new util.LinkedHashMap[FieldName, FieldValue]
for (inputField: InputField <- pmmlModel.getInputFields.asScala) {
arguments.put(inputField.getField.getName,
inputField.prepare(customer.all(fieldName.getValue)))
}
// return the notification with a relevancy score
val results = pmmlModel.evaluate(arguments)
pmmlModel.getTargetFields.asScala.headOption match {
case Some(targetField) =>
val targetFieldValue = results.get(targetField.getName)
case _ => throw new Exception("No valid target")
}
}
}
Message bus | Streaming technology | Database |
---|---|---|
Kafka | Flink | Cassandra |
Kafka | Spark Structured Streaming | Ignite |
Kinesis Data Streams | Kinesis Data Firehose | DynamoDb |
Read more about streaming analytics at:
Source code and presentation are available at: