Bas Geerdink | Big Data Meetup | 03-05-2018
| Sector | Data source | Pattern + prediction | Notification |
|---|---|---|---|
| Finance | Debit card transactions | Running out of credit | Actionable insight |
| Finance | Payment data | Fraud detection | Block money transfer |
| Insurance | Page visits | Customer is stuck in form | Chat window |
| Healthcare | Patient data | Heart failure | Alert doctor |
| Retail | Tweets | Sentiment analysis | Social media response |
| Traffic | Number of cars passing | Traffic jam | Update route info |
| Manufacturing | Logs | System failure | Alert to sys admin |
org.apache.flink
flink-scala_2.11
1.5.0
provided
org.apache.flink
flink-streaming-scala_2.11
1.5.0
provided
def createKafkaEventStream(env: StreamExecutionEnvironment):
DataStream[RawEvent] = {
// create KeyedDeserializationSchema
val readSchema = KafkaSchemaFactory.createKeyedDeserializer
(readTopicDef, rawEventFromPayload)
// consume events
val rawEventSource = new FlinkKafkaConsumer010[BaseKafkaEvent]
(toStringList(readTopicDef), readSchema, props)
// create source
env.addSource(rawEventSource)
.filter(_.isSuccess) // if decrypting from kafka succeeded
.flatMap(_.toOption)
}
taskmanager.numberOfTaskSlots: 4
parallelism.default: 2
def createCepPipeline(sourceStream: DataStream[RawEvent]):
DataStream[BusinessEvent] = {
val windowStream = sourceStream
.keyBy(_.customerId.head) // create parallelism: 0..9
.window(GlobalWindows.create()) // window per key
.apply(new WindowResultFunction()) // logic: pattern match
.map(event => event.addTimeStamp("window"))
}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

env.enableCheckPointing(10000) // checkpoint every 10 seconds
# Supported backends: jobmanager, filesystem, rocksdb
#
state.backend: filesystem
# show the current list of jobs: gives job id
/flink/bin/flink list
# create a savepoint
/flink/bin/flink savepoint [JOB_ID]
# wait for response: gives savepoint id
/flink/bin/flink cancel [JOB_ID]
# start the new job
/flink/bin/flink run -p 4 \
-s jobmanager://savepoints/[SAVEPOINT_ID] \
-c ai.styx.StyxAppJob styx-app-0.0.1.jar \
--config /opt/flink/jars/reference.conf
val cep = Pattern.begin[ClickEvent]("Page1").within(Time.seconds(1))
.where( event =>
event match {
case c: ClickEvent => c.processStep.equals(steps(0))
case _ => false
})
.next("Page2") // check that Page1 is followed by Page2
.where(event => event match {
case c: ClickEvent => c.processStep.equals(steps(1))
case _ => false
})
.next("Page3") // check that Page2 is followed by Page3
.where(event => event match {
case c: ClickEvent => c.processStep.equals(steps(2))
case _ => false
})
.within(Time.minutes(30)) // no match = pattern detected
}
from sklearn.linear_model import LogisticRegression
from sklearn2pmml import sklearn2pmml
events_df = pandas.read_csv("events.csv")
pipeline = PMMLPipeline(...)
pipeline.fit(events_df, events_df["notifications"])
sklearn2pmml(pipeline, "LogisticRegression.pmml", with_repr = True)
def score(event: RichBusinessEvent, pmmlModel: PmmlModel): Double = {
val arguments = new util.LinkedHashMap[FieldName, FieldValue]
for (inputField: InputField <- pmmlModel.getInputFields.asScala) {
arguments.put(inputField.getField.getName,
inputField.prepare(customer.all(fieldName.getValue)))
}
// return the notification with a relevancy score
val results = pmmlModel.evaluate(arguments)
pmmlModel.getTargetFields.asScala.headOption match {
case Some(targetField) =>
val targetFieldValue = results.get(targetField.getName)
case _ => throw new Exception("No valid target")
}
}
}
Read more about streaming analytics at:
Source code and presentation is available at: