 
                Bas Geerdink | July 17, 2019 | O'Reilly OSCon
                { 
                  "name": "Bas Geerdink",
                  "role": "Technology Lead",
                  "company": "ING Bank",
                  "background": ["Artificial Intelligence",
                                 "Informatics"],
                  "mixins": ["Software engineering",
                             "Architecture",
                             "Management",
                             "Innovation"],
                  "twitter": "@bgeerdink",
                  "linked_in": "bgeerdink"
                }
            | Sector | Data source | Pattern | Notification | 
|---|---|---|---|
| Marketing | Tweets | Trend analysis | Actionable insights | 
| Finance | Payment data | Fraud detection | Block money transfer | 
| Insurance | Page visits | Customer is stuck in form | Chat window | 
| Healthcare | Patient data | Heart failure | Alert doctor | 
| Traffic | Cars passing | Traffic jam | Update route info | 
| Internet of Things | Machine logs | System failure | Alert to sys admin | 
| Gaming | Player actions | Key combos | Action on screen | 
 
             
                
def getData(env: StreamExecutionEnvironment): DataStream[MyEvent] = {
    // create schema for deserialization
    val readSchema = KafkaSchemaFactory.createKeyedDeserializer
            (readTopicDef, rawEventFromPayload)
    // consume events
    val rawEventSource = new FlinkKafkaConsumer010[BaseKafkaEvent]
            (toStringList(readTopicDef), readSchema, props)
    // connect to source and handle events
    env.addSource(rawEventSource)
      .filter(_.isSuccess)   // deserialize must succeed
      .flatMap(event => doSomething(event))
      .addSink(writeToOutputStream)
}
                 
                
// in: (word, count)
// out: (timestamp of end of window, word, count), key (word), window
class WordCountWindowFunction extends
        WindowFunction[(String, Int), WordCount, String, TimeWindow] {
  def apply(key: String,
            window: TimeWindow,
            input: Iterable[(String, Int)],
            out: Collector[WordCount]): Unit = {
    val count = input.count(_ => true)
    out.collect(WordCount(window.getEnd, key, count))
  }
}
                    
                     
                    
                
 
                
env.addSource(rawEventSource)
      .filter(_.isSuccess)   // deserialize must succeed
      .assignTimestampsAndWatermarks(
                        new TimedEventWatermarkExtractor)
      .flatMap(event => doSomething(event))
      .addSink(writeToOutputStream)
                    
                    class TimedEventWatermarkExtractor
                       extends AssignerWithPeriodicWatermarks[TimedEvent]() {
                        // specify the event time
                        override def extractTimestamp(element: TimedEvent,
                                        previousElementTimestamp: Long): Long = {
                            // set event time to 'eventTime' field in events
                            element.eventTime.getMillis
                        }
                        override def getCurrentWatermark: Watermark = ???
                    }
                    
                    class TimedEventWatermarkExtractor
                       extends AssignerWithPeriodicWatermarks[TimedEvent]() {
                        // specify the event time
                        override def extractTimestamp(element: TimedEvent,
                                        previousElementTimestamp: Long): Long = {
                            // set event time to 'eventTime' field in events
                            element.eventTime.getMillis
                        }
                        // this method is called to emit a watermark every time the
                        // ExecutionConfig.setAutoWatermarkInterval(...) interval occurs
                        override def getCurrentWatermark: Watermark = {
                            // one second delay for processing of window
                            new Watermark(System.currentTimeMillis() - 1000)
                        }
                    }
                    
   env.enableCheckPointing(10000)  // checkpoint every 10 seconds
                
    # Supported backends: jobmanager, filesystem, rocksdb
    #
    state.backend: filesystem
                 
                
from sklearn.linear_model import LogisticRegression
from sklearn2pmml import sklearn2pmml
events_df = pandas.read_csv("events.csv")
pipeline = PMMLPipeline(...)
pipeline.fit(events_df, events_df["notifications"])
sklearn2pmml(pipeline, "LogisticRegression.pmml", with_repr = True)
                 
                
                def score(event: RichBusinessEvent, pmmlModel: PmmlModel): Double = {
                    val arguments = new util.LinkedHashMap[FieldName, FieldValue]
                    for (inputField: InputField <- pmmlModel.getInputFields.asScala) {
                        arguments.put(inputField.getField.getName,
                            inputField.prepare(customer.all(fieldName.getValue)))
                    }
                    // return the notification with a relevancy score
                    val results = pmmlModel.evaluate(arguments)
                    pmmlModel.getTargetFields.asScala.headOption match {
                        case Some(targetField) =>
                            val targetFieldValue = results.get(targetField.getName)
                        case _ => throw new Exception("No valid target")
                        }
                    }
                }
                | Message bus | Streaming technology | Database | 
|---|---|---|
| Kafka | Flink | Cassandra | 
| Kafka | Spark Structured Streaming | Ignite | 
| Kinesis Data Streams | Kinesis Data Firehose | DynamoDb | 
Read more about streaming analytics at:
Source code and presentation are available at:
