![]() |
![]() |
![]() |
![]() |
Overview
This tutorial focuses on streaming data from a Kafka cluster into a tf.data.Dataset
which is then used in conjunction with tf.keras
for training and inference.
Kafka is primarily a distributed event-streaming platform which provides scalable and fault-tolerant streaming data across data pipelines. It is an essential technical component of a plethora of major enterprises where mission-critical data delivery is a primary requirement.
Setup
Install the required tensorflow-io and kafka packages
pip install -q tensorflow-io
pip install -q kafka-python
Import packages
import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio
Validate tf and tfio imports
print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.17.0 tensorflow version: 2.4.1
Download and setup Kafka and Zookeeper instances
For demo purposes, the following instances are setup locally:
- Kafka (Brokers: 127.0.0.1:9092)
- Zookeeper (Node: 127.0.0.1:2181)
curl -sSOL https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz
tar -xzf kafka_2.13-2.7.0.tgz
Using the default configurations (provided by Apache Kafka) for spinning up the instances.
./kafka_2.13-2.7.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-2.7.0/config/zookeeper.properties
./kafka_2.13-2.7.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-2.7.0/config/server.properties
echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
sleep 10
Waiting for 10 secs until kafka and zookeeper services are up and running
Once the instances are started as daemon processes, grep for kafka
in the processes list. The two java processes correspond to zookeeper and the kafka instances.
ps -ef | grep kafka
kbuilder 3128 22247 2 19:38 ? 00:00:00 python /tmpfs/src/gfile/executor.py --input_notebook=/tmpfs/src/temp/docs/tutorials/kafka.ipynb --timeout=15000 kbuilder 3562 1 13 19:38 ? 00:00:01 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.0/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-api-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-basic-auth-extension-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-file-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-json-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-mirror-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-mirror-client-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-runtime-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-transforms-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-client-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-common-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-container-servlet-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-container-servlet-core-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-hk2-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-media-jaxb-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-server-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-client-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-continuation-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-http-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-io-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-security-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-server-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-servlet-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-servlets-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-util-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-clients-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-log4j-appender-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-raft-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-streams-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-streams-examples-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-streams-scala_2.13-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-streams-test-utils-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-tools-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka_2.13-2.7.0-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka_2.13-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/maven-artifact-3.6.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-buffer-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-codec-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-common-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-handler-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-resolver-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-transport-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-transport-native-epoll-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-transport-native-unix-common-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/zookeeper-3.5.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/zookeeper-jute-3.5.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/zstd-jni-1.4.5-6.jar org.apache.zookeeper.server.quorum.QuorumPeerMain ./kafka_2.13-2.7.0/config/zookeeper.properties kbuilder 3930 1 52 19:38 ? 00:00:05 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.0/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-api-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-basic-auth-extension-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-file-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-json-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-mirror-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-mirror-client-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-runtime-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-transforms-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-client-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-common-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-container-servlet-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-container-servlet-core-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-hk2-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-media-jaxb-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-server-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-client-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-continuation-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-http-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-io-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-security-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-server-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-servlet-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-servlets-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-util-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-clients-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-log4j-appender-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-raft-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-streams-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-streams-examples-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-streams-scala_2.13-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-streams-test-utils-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-tools-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka_2.13-2.7.0-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka_2.13-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/maven-artifact-3.6.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-buffer-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-codec-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-common-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-handler-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-resolver-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-transport-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-transport-native-epoll-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-transport-native-unix-common-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/zookeeper-3.5.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/zookeeper-jute-3.5.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/zstd-jni-1.4.5-6.jar kafka.Kafka ./kafka_2.13-2.7.0/config/server.properties kbuilder 4118 3132 0 19:38 pts/0 00:00:00 /bin/bash -c ps -ef | grep kafka kbuilder 4120 4118 0 19:38 pts/0 00:00:00 grep kafka
Create the kafka topics with the following specs:
- susy-train: partitions=1, replication-factor=1
- susy-test: partitions=2, replication-factor=1
./kafka_2.13-2.7.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic susy-train
./kafka_2.13-2.7.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic susy-test
Created topic susy-train. Created topic susy-test.
Describe the topic for details on the configuration
./kafka_2.13-2.7.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-train
./kafka_2.13-2.7.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-test
Topic: susy-train PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: susy-train Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: susy-test PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: susy-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: susy-test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
The replication factor 1 indicates that the data is not being replicated. This is due to the presence of a single broker in our kafka setup. In production systems, the number of bootstrap servers can be in the range of 100's of nodes. That is where the fault-tolerance using replication comes into picture.
Please refer to the docs for more details.
SUSY Dataset
Kafka being an event streaming platform, enables data from various sources to be written into it. For instance:
- Web traffic logs
- Astronomical measurements
- IoT sensor data
- Product reviews and many more.
For the purpose of this tutorial, lets download the SUSY dataset and feed the data into kafka manually. The goal of this classification problem is to distinguish between a signal process which produces supersymmetric particles and a background process which does not.
curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz
Explore the dataset
The first column is the class label (1 for signal, 0 for background), followed by the 18 features (8 low-level features then 10 high-level features). The first 8 features are kinematic properties measured by the particle detectors in the accelerator. The last 10 features are functions of the first 8 features. These are high-level features derived by physicists to help discriminate between the two classes.
COLUMNS = [
# labels
'class',
# low-level features
'lepton_1_pT',
'lepton_1_eta',
'lepton_1_phi',
'lepton_2_pT',
'lepton_2_eta',
'lepton_2_phi',
'missing_energy_magnitude',
'missing_energy_phi',
# high-level derived features
'MET_rel',
'axial_MET',
'M_R',
'M_TR_2',
'R',
'MT2',
'S_R',
'M_Delta_R',
'dPhi_r_b',
'cos(theta_r1)'
]
The entire dataset consists of 5 million rows. However, for the purpose of this tutorial, let's consider only a fraction of the dataset (100,000 rows) so that less time is spent on the moving the data and more time on understanding the functionality of the api.
susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
susy_df = next(susy_iterator)
susy_df.head()
# Number of datapoints and columns
len(susy_df), len(susy_df.columns)
(100000, 19)
# Number of datapoints belonging to each class (0: background noise, 1: signal)
len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])
(54025, 45975)
Split the dataset
train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))
x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]
x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]
# The labels are set as the kafka message keys so as to store data
# in multiple-partitions. Thus, enabling efficient data retrieval
# using the consumer groups.
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))
x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
Number of training samples: 60000 Number of testing sample: 40000
NUM_COLUMNS = len(x_train_df.columns)
len(x_train), len(y_train), len(x_test), len(y_test)
(60000, 60000, 40000, 40000)
Store the train and test data in kafka
Storing the data in kafka simulates an environment for continuous remote data retrieval for training and inference purposes.
def error_callback(exc):
raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))
def write_to_kafka(topic_name, items):
count=0
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for message, key in items:
producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)
count+=1
producer.flush()
print("Wrote {0} messages into topic: {1}".format(count, topic_name))
write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))
Wrote 60000 messages into topic: susy-train Wrote 40000 messages into topic: susy-test
Define the tfio train dataset
The IODataset
class is utilized for streaming data from kafka into tensorflow. The class inherits from tf.data.Dataset
and thus has all the useful functionalities of tf.data.Dataset
out of the box.
def decode_kafka_item(item):
message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
key = tf.strings.to_number(item.key)
return (message, key)
BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64
train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)
Build and train the model
# Set the parameters
OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10
# design/build the model
model = tf.keras.Sequential([
tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dropout(0.4),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.4),
tf.keras.layers.Dense(1, activation='sigmoid')
])
print(model.summary())
Model: "sequential" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= dense (Dense) (None, 128) 2432 _________________________________________________________________ dropout (Dropout) (None, 128) 0 _________________________________________________________________ dense_1 (Dense) (None, 256) 33024 _________________________________________________________________ dropout_1 (Dropout) (None, 256) 0 _________________________________________________________________ dense_2 (Dense) (None, 128) 32896 _________________________________________________________________ dropout_2 (Dropout) (None, 128) 0 _________________________________________________________________ dense_3 (Dense) (None, 1) 129 ================================================================= Total params: 68,481 Trainable params: 68,481 Non-trainable params: 0 _________________________________________________________________ None
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model
model.fit(train_ds, epochs=EPOCHS)
Epoch 1/10 938/938 [==============================] - 31s 32ms/step - loss: 0.5218 - accuracy: 0.7379 Epoch 2/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4578 - accuracy: 0.7858 Epoch 3/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4516 - accuracy: 0.7906 Epoch 4/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4492 - accuracy: 0.7908 Epoch 5/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4458 - accuracy: 0.7944 Epoch 6/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4466 - accuracy: 0.7947 Epoch 7/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4439 - accuracy: 0.7939 Epoch 8/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4442 - accuracy: 0.7941 Epoch 9/10 938/938 [==============================] - 31s 32ms/step - loss: 0.4421 - accuracy: 0.7951 Epoch 10/10 938/938 [==============================] - 30s 32ms/step - loss: 0.4423 - accuracy: 0.7962 <tensorflow.python.keras.callbacks.History at 0x7fe8b8405b00>
Since only a fraction of the dataset is being utilized, our accuracy is limited to ~78% during the training phase. However, please feel free to store additional data in kafka for a better model performance. Also, since the goal was to just demonstrate the functionality of the tfio kafka datasets, a smaller and less-complicated neural network was used. However, one can increase the complexity of the model, modify the learning strategy, tune hyper-parameters etc for exploration purposes. For a baseline approach, please refer to this article.
Infer on the test data
To infer on the test data by adhering to the 'exactly-once' semantics along with fault-tolerance, the streaming.KafkaGroupIODataset
can be utilized.
Define the tfio test dataset
The stream_timeout
parameter blocks for the given duration for new data points to be streamed into the topic. This removes the need for creating new datasets if the data is being streamed into the topic in an intermittent fashion.
test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
topics=["susy-test"],
group_id="testcg",
servers="127.0.0.1:9092",
stream_timeout=10000,
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest"
],
)
def decode_kafka_test_item(raw_message, raw_key):
message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
key = tf.strings.to_number(raw_key)
return (message, key)
test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(BATCH_SIZE)
Though this class can be used for training purposes, there are caveats which need to be addressed. Once all the messages are read from kafka and the latest offsets are committed using the streaming.KafkaGroupIODataset
, the consumer doesn't restart reading the messages from the beginning. Thus, while training, it is possible only to train for a single epoch with the data continuously flowing in. This kind of a functionality has limited use cases during the training phase wherein, once a datapoint has been consumed by the model it is no longer required and can be discarded.
However, this functionality shines when it comes to robust inference with exactly-once semantics.
evaluate the performance on the test data
res = model.evaluate(test_ds)
print("test loss, test acc:", res)
625/625 [==============================] - 13s 21ms/step - loss: 0.4367 - accuracy: 0.7980 test loss, test acc: [0.4366855025291443, 0.7980499863624573]
Since the inference is based on 'exactly-once' semantics, the evaluation on the test set can be run only once. In order to run the inference again on the test data, a new consumer group should be used.
Track the offset lag of the testcg
consumer group
./kafka_2.13-2.7.0/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testcg
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID testcg susy-test 0 21592 21592 0 rdkafka-66bc39ef-397c-4190-a17c-69ab519d4f53 /10.142.0.106 rdkafka testcg susy-test 1 18408 18408 0 rdkafka-66bc39ef-397c-4190-a17c-69ab519d4f53 /10.142.0.106 rdkafka
Once the current-offset
matches the log-end-offset
for all the partitions, it indicates that the consumer(s) have completed fetching all the messages from the kafka topic.
Online learning
The online machine learning paradigm is a bit different from the traditional/conventional way of training machine learning models. In the former case, the model continues to incrementally learn/update it's parameters as soon as the new data points are available and this process is expected to continue indefinitely. This is unlike the latter approaches where the dataset is fixed and the model iterates over it n
number of times. In online learning, the data once consumed by the model may not be available for training again.
By utilizing the streaming.KafkaBatchIODataset
, it is now possible to train the models in this fashion. Let's continue to use our SUSY dataset for demonstrating this functionality.
The tfio training dataset for online learning
The streaming.KafkaBatchIODataset
is similar to the streaming.KafkaGroupIODataset
in it's API. Additionally, it is recommended to utilize the stream_timeout
parameter to configure the duration for which the dataset will block for new messages before timing out. In the instance below, the dataset is configured with a stream_timeout
of 10000
milliseconds. This implies that, after all the messages from the topic have been consumed, the dataset will wait for an additional 10 seconds before timing out and disconnecting from the kafka cluster. If new messages are streamed into the topic before timing out, the data consumption and model training resumes for those newly consumed data points. To block indefinitely, set it to -1
.
online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
topics=["susy-train"],
group_id="cgonline",
servers="127.0.0.1:9092",
stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1.
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest"
],
)
Every item that the online_train_ds
generates is a tf.data.Dataset
in itself. Thus, all the standard transformations can be applied as usual.
def decode_kafka_online_item(raw_message, raw_key):
message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
key = tf.strings.to_number(raw_key)
return (message, key)
for mini_ds in online_train_ds:
mini_ds = mini_ds.shuffle(buffer_size=32)
mini_ds = mini_ds.map(decode_kafka_online_item)
mini_ds = mini_ds.batch(32)
model.fit(mini_ds, epochs=3)
Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4279 - accuracy: 0.8115 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4130 - accuracy: 0.8213 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.3996 - accuracy: 0.8242 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4646 - accuracy: 0.7920 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4415 - accuracy: 0.8047 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4296 - accuracy: 0.8018 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4410 - accuracy: 0.7969 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4223 - accuracy: 0.8174 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4187 - accuracy: 0.8096 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4679 - accuracy: 0.7734 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4292 - accuracy: 0.7979 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4411 - accuracy: 0.7959 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4750 - accuracy: 0.7695 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4460 - accuracy: 0.7959 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4345 - accuracy: 0.8008 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4282 - accuracy: 0.8154 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4086 - accuracy: 0.8184 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.3942 - accuracy: 0.8291 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4265 - accuracy: 0.7852 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4233 - accuracy: 0.7988 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4037 - accuracy: 0.8164 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4463 - accuracy: 0.7988 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4064 - accuracy: 0.8164 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4093 - accuracy: 0.8115 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4652 - accuracy: 0.7930 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4508 - accuracy: 0.7930 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4296 - accuracy: 0.8018 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4671 - accuracy: 0.7812 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4519 - accuracy: 0.7910 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4431 - accuracy: 0.7930 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4905 - accuracy: 0.7637 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4691 - accuracy: 0.7764 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4518 - accuracy: 0.7803 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4379 - accuracy: 0.7930 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4314 - accuracy: 0.8057 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4232 - accuracy: 0.7988 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4892 - accuracy: 0.7539 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4660 - accuracy: 0.7773 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4572 - accuracy: 0.7842 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4474 - accuracy: 0.7871 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4435 - accuracy: 0.7920 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4247 - accuracy: 0.8047 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4306 - accuracy: 0.8115 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4219 - accuracy: 0.8164 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4017 - accuracy: 0.8223 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4662 - accuracy: 0.7881 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4563 - accuracy: 0.7988 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4445 - accuracy: 0.7930 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4437 - accuracy: 0.7910 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4282 - accuracy: 0.7988 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4203 - accuracy: 0.8057 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4227 - accuracy: 0.7998 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4005 - accuracy: 0.8223 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.3905 - accuracy: 0.8184 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4617 - accuracy: 0.7871 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4414 - accuracy: 0.7891 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4320 - accuracy: 0.8018 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4493 - accuracy: 0.7900 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4418 - accuracy: 0.8037 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4265 - accuracy: 0.8096 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4740 - accuracy: 0.7783 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4498 - accuracy: 0.7812 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4473 - accuracy: 0.7910 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4439 - accuracy: 0.7900 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4217 - accuracy: 0.8047 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4126 - accuracy: 0.8135 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4417 - accuracy: 0.7998 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4120 - accuracy: 0.8105 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.3967 - accuracy: 0.8213 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4657 - accuracy: 0.7822 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4437 - accuracy: 0.7998 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4344 - accuracy: 0.8057 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4385 - accuracy: 0.8018 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4314 - accuracy: 0.8096 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4186 - accuracy: 0.8086 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4918 - accuracy: 0.7725 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4719 - accuracy: 0.7725 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4586 - accuracy: 0.7920 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4989 - accuracy: 0.7607 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4835 - accuracy: 0.7676 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4796 - accuracy: 0.7695 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4645 - accuracy: 0.7754 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4505 - accuracy: 0.7812 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4420 - accuracy: 0.7852 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4219 - accuracy: 0.8047 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4029 - accuracy: 0.8135 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.3870 - accuracy: 0.8252 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4627 - accuracy: 0.7871 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4313 - accuracy: 0.8066 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4324 - accuracy: 0.8066 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4532 - accuracy: 0.7959 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4370 - accuracy: 0.7871 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4301 - accuracy: 0.8027 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4319 - accuracy: 0.7910 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4268 - accuracy: 0.7979 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4096 - accuracy: 0.8154 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4738 - accuracy: 0.7734 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4411 - accuracy: 0.7842 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4410 - accuracy: 0.7754 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4437 - accuracy: 0.7881 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4273 - accuracy: 0.8047 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4030 - accuracy: 0.8193 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4604 - accuracy: 0.7725 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4432 - accuracy: 0.7744 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4299 - accuracy: 0.7969 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4276 - accuracy: 0.8037 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4138 - accuracy: 0.8076 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4009 - accuracy: 0.8105 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4273 - accuracy: 0.8086 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4049 - accuracy: 0.8193 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.3974 - accuracy: 0.8252 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4755 - accuracy: 0.7705 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4454 - accuracy: 0.7832 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4358 - accuracy: 0.7939 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4691 - accuracy: 0.7686 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4572 - accuracy: 0.7793 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4484 - accuracy: 0.7979 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4292 - accuracy: 0.8066 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4220 - accuracy: 0.8145 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4183 - accuracy: 0.8115 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4420 - accuracy: 0.7930 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4384 - accuracy: 0.7939 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4354 - accuracy: 0.7881 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4731 - accuracy: 0.7666 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4544 - accuracy: 0.7764 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4451 - accuracy: 0.7812 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4211 - accuracy: 0.8047 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4097 - accuracy: 0.8047 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4013 - accuracy: 0.8164 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4625 - accuracy: 0.7832 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4405 - accuracy: 0.7998 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4224 - accuracy: 0.8066 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4404 - accuracy: 0.7881 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4227 - accuracy: 0.7959 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4079 - accuracy: 0.8066 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4570 - accuracy: 0.7910 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4355 - accuracy: 0.8135 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4281 - accuracy: 0.7998 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4721 - accuracy: 0.7891 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4606 - accuracy: 0.7969 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4401 - accuracy: 0.8008 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4666 - accuracy: 0.7842 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4574 - accuracy: 0.7871 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4460 - accuracy: 0.7920 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4538 - accuracy: 0.7969 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4420 - accuracy: 0.8047 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4189 - accuracy: 0.8086 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4766 - accuracy: 0.7627 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4589 - accuracy: 0.7725 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4550 - accuracy: 0.7725 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4338 - accuracy: 0.8018 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4217 - accuracy: 0.8057 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4044 - accuracy: 0.8154 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4523 - accuracy: 0.7881 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4451 - accuracy: 0.7920 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4208 - accuracy: 0.8018 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4711 - accuracy: 0.7666 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4518 - accuracy: 0.7852 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4293 - accuracy: 0.7930 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4786 - accuracy: 0.7783 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4586 - accuracy: 0.7842 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4426 - accuracy: 0.7881 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4392 - accuracy: 0.8115 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4181 - accuracy: 0.8252 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4134 - accuracy: 0.8223 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4380 - accuracy: 0.7998 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4271 - accuracy: 0.8018 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4118 - accuracy: 0.8105 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4469 - accuracy: 0.7939 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4358 - accuracy: 0.7959 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4253 - accuracy: 0.8018 Epoch 1/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4574 - accuracy: 0.8018 Epoch 2/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4531 - accuracy: 0.8018 Epoch 3/3 32/32 [==============================] - 0s 3ms/step - loss: 0.4392 - accuracy: 0.7988 Epoch 1/3 19/19 [==============================] - 0s 3ms/step - loss: 0.4244 - accuracy: 0.8191 Epoch 2/3 19/19 [==============================] - 0s 3ms/step - loss: 0.4049 - accuracy: 0.8339 Epoch 3/3 19/19 [==============================] - 0s 3ms/step - loss: 0.3980 - accuracy: 0.8224
The incrementally trained model can be saved in a periodic fashion (based on use-cases) and can be utilized to infer on the test data in either online or offline modes.
References:
Baldi, P., P. Sadowski, and D. Whiteson. “Searching for Exotic Particles in High-energy Physics with Deep Learning.” Nature Communications 5 (July 2, 2014)
SUSY Dataset: https://archive.ics.uci.edu/ml/datasets/SUSY#