काफ्का और टेन्सफोर्लो-आईओ का उपयोग करके स्ट्रीमिंग डेटा पर मजबूत मशीन लर्निंग

TensorFlow.org पर देखें Google Colab में चलाएं GitHub पर स्रोत देखें नोटबुक डाउनलोड करें

अवलोकन

इस ट्यूटोरियल एक से डेटा स्ट्रीमिंग पर केंद्रित है काफ्का एक में क्लस्टर tf.data.Dataset जो तब के साथ संयोजन के रूप में प्रयोग किया जाता है tf.keras प्रशिक्षण और अनुमान के लिए।

काफ्का मुख्य रूप से एक वितरित इवेंट-स्ट्रीमिंग प्लेटफॉर्म है जो डेटा पाइपलाइनों में स्केलेबल और दोष-सहिष्णु स्ट्रीमिंग डेटा प्रदान करता है। यह प्रमुख उद्यमों के ढेरों का एक आवश्यक तकनीकी घटक है जहां मिशन-महत्वपूर्ण डेटा वितरण एक प्राथमिक आवश्यकता है।

सेट अप

आवश्यक टेंसरफ़्लो-आईओ और काफ्का पैकेज स्थापित करें

pip install -q tensorflow-io
pip install -q kafka-python

पैकेज आयात करें

import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio

मान्य tf और tfio आयात

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.17.0
tensorflow version: 2.4.1

काफ्का और ज़ुकीपर इंस्टेंस को डाउनलोड और सेटअप करें

डेमो उद्देश्यों के लिए, निम्नलिखित उदाहरण स्थानीय रूप से सेट किए गए हैं:

  • काफ्का (दलाल: 127.0.0.1:9092)
  • जुकीपर (नोड: 127.0.0.1:2181)
curl -sSOL https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz
tar -xzf kafka_2.13-2.7.0.tgz

उदाहरणों को स्पिन करने के लिए डिफ़ॉल्ट कॉन्फ़िगरेशन (अपाचे काफ्का द्वारा प्रदान किया गया) का उपयोग करना।

./kafka_2.13-2.7.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-2.7.0/config/zookeeper.properties
./kafka_2.13-2.7.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-2.7.0/config/server.properties
echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
sleep 10
Waiting for 10 secs until kafka and zookeeper services are up and running

एक बार जब उदाहरणों डेमॉन प्रक्रियाओं, के लिए ग्रेप के रूप में शुरू कर रहे हैं kafka प्रक्रियाओं की सूची में। दो जावा प्रक्रियाएं ज़ूकीपर और काफ्का उदाहरणों के अनुरूप हैं।

ps -ef | grep kafka
kbuilder  3128 22247  2 19:38 ?        00:00:00 python /tmpfs/src/gfile/executor.py --input_notebook=/tmpfs/src/temp/docs/tutorials/kafka.ipynb --timeout=15000
kbuilder  3562     1 13 19:38 ?        00:00:01 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.0/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-api-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-basic-auth-extension-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-file-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-json-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-mirror-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-mirror-client-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-runtime-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-transforms-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-client-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-common-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-container-servlet-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-container-servlet-core-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-hk2-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-media-jaxb-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-server-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-client-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-continuation-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-http-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-io-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-security-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-server-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-servlet-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-servlets-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-util-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-clients-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-log4j-appender-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-raft-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-streams-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-streams-examples-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-streams-scala_2.13-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-streams-test-utils-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-tools-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka_2.13-2.7.0-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka_2.13-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/maven-artifact-3.6.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-buffer-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-codec-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-common-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-handler-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-resolver-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-transport-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-transport-native-epoll-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-transport-native-unix-common-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/zookeeper-3.5.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/zookeeper-jute-3.5.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/zstd-jni-1.4.5-6.jar org.apache.zookeeper.server.quorum.QuorumPeerMain ./kafka_2.13-2.7.0/config/zookeeper.properties
kbuilder  3930     1 52 19:38 ?        00:00:05 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.0/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-api-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-basic-auth-extension-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-file-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-json-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-mirror-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-mirror-client-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-runtime-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/connect-transforms-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-client-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-common-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-container-servlet-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-container-servlet-core-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-hk2-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-media-jaxb-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jersey-server-2.31.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-client-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-continuation-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-http-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-io-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-security-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-server-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-servlet-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-servlets-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jetty-util-9.4.33.v20201020.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-clients-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-log4j-appender-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-raft-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-streams-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-streams-examples-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-streams-scala_2.13-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-streams-test-utils-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka-tools-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka_2.13-2.7.0-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/kafka_2.13-2.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/maven-artifact-3.6.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-buffer-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-codec-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-common-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-handler-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-resolver-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-transport-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-transport-native-epoll-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/netty-transport-native-unix-common-4.1.51.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/zookeeper-3.5.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/zookeeper-jute-3.5.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.0/bin/../libs/zstd-jni-1.4.5-6.jar kafka.Kafka ./kafka_2.13-2.7.0/config/server.properties
kbuilder  4118  3132  0 19:38 pts/0    00:00:00 /bin/bash -c ps -ef | grep kafka
kbuilder  4120  4118  0 19:38 pts/0    00:00:00 grep kafka

निम्नलिखित विशिष्टताओं के साथ काफ्का विषय बनाएँ:

  • सूसी-ट्रेन: विभाजन = 1, प्रतिकृति-कारक = 1
  • सूसी-परीक्षण: विभाजन = 2, प्रतिकृति-कारक = 1
./kafka_2.13-2.7.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic susy-train
./kafka_2.13-2.7.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic susy-test
Created topic susy-train.
Created topic susy-test.

कॉन्फ़िगरेशन पर विवरण के लिए विषय का वर्णन करें

./kafka_2.13-2.7.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-train
./kafka_2.13-2.7.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-test
Topic: susy-train PartitionCount: 1 ReplicationFactor: 1  Configs: segment.bytes=1073741824
    Topic: susy-train Partition: 0  Leader: 0 Replicas: 0   Isr: 0
Topic: susy-test  PartitionCount: 2 ReplicationFactor: 1  Configs: segment.bytes=1073741824
    Topic: susy-test  Partition: 0  Leader: 0 Replicas: 0   Isr: 0
    Topic: susy-test  Partition: 1  Leader: 0 Replicas: 0   Isr: 0

प्रतिकृति कारक 1 इंगित करता है कि डेटा को दोहराया नहीं जा रहा है। यह हमारे काफ्का सेटअप में एकल ब्रोकर की उपस्थिति के कारण है। उत्पादन प्रणालियों में, बूटस्ट्रैप सर्वरों की संख्या 100 नोड्स की सीमा में हो सकती है। यहीं से प्रतिकृति का उपयोग करते हुए दोष-सहिष्णुता चित्र में आती है।

देखें डॉक्स अधिक जानकारी के लिए।

सूसी डेटासेट

काफ्का एक इवेंट स्ट्रीमिंग प्लेटफॉर्म होने के कारण, इसमें विभिन्न स्रोतों से डेटा लिखा जा सकता है। उदाहरण के लिए:

  • वेब ट्रैफिक लॉग
  • खगोलीय माप
  • IoT सेंसर डेटा
  • उत्पाद समीक्षाएँ और बहुत कुछ।

इस ट्यूटोरियल के प्रयोजन के लिए, डाउनलोड करने देता है SUSY डाटासेट और मैन्युअल काफ्का में डेटा फ़ीड। इस वर्गीकरण समस्या का लक्ष्य एक सिग्नल प्रक्रिया के बीच अंतर करना है जो सुपरसिमेट्रिक कण उत्पन्न करता है और एक पृष्ठभूमि प्रक्रिया जो नहीं करता है।

curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz

डेटासेट का अन्वेषण करें

पहला कॉलम क्लास लेबल (सिग्नल के लिए 1, बैकग्राउंड के लिए 0) है, इसके बाद 18 फीचर्स (8 लो-लेवल फीचर्स फिर 10 हाई-लेवल फीचर्स) हैं। पहली 8 विशेषताएं त्वरक में कण डिटेक्टरों द्वारा मापी गई गतिज गुण हैं। अंतिम 10 विशेषताएं पहले 8 सुविधाओं के कार्य हैं। ये दो वर्गों के बीच भेदभाव करने में मदद करने के लिए भौतिकविदों द्वारा प्राप्त उच्च-स्तरीय विशेषताएं हैं।

COLUMNS = [
          #  labels
           'class',
          #  low-level features
           'lepton_1_pT',
           'lepton_1_eta',
           'lepton_1_phi',
           'lepton_2_pT',
           'lepton_2_eta',
           'lepton_2_phi',
           'missing_energy_magnitude',
           'missing_energy_phi',
          #  high-level derived features
           'MET_rel',
           'axial_MET',
           'M_R',
           'M_TR_2',
           'R',
           'MT2',
           'S_R',
           'M_Delta_R',
           'dPhi_r_b',
           'cos(theta_r1)'
           ]

संपूर्ण डेटासेट में 5 मिलियन पंक्तियाँ होती हैं। हालाँकि, इस ट्यूटोरियल के उद्देश्य के लिए, आइए डेटासेट (100,000 पंक्तियों) के केवल एक अंश पर विचार करें ताकि डेटा को स्थानांतरित करने में कम समय और एपीआई की कार्यक्षमता को समझने में अधिक समय लगे।

susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
susy_df = next(susy_iterator)
susy_df.head()
# Number of datapoints and columns
len(susy_df), len(susy_df.columns)
(100000, 19)
# Number of datapoints belonging to each class (0: background noise, 1: signal)
len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])
(54025, 45975)

डेटासेट विभाजित करें

train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))

x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]

x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]

# The labels are set as the kafka message keys so as to store data
# in multiple-partitions. Thus, enabling efficient data retrieval
# using the consumer groups.
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))

x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
Number of training samples:  60000
Number of testing sample:  40000
NUM_COLUMNS = len(x_train_df.columns)
len(x_train), len(y_train), len(x_test), len(y_test)
(60000, 60000, 40000, 40000)

काफ्का में ट्रेन और परीक्षण डेटा स्टोर करें

काफ्का में डेटा संग्रहीत करना प्रशिक्षण और अनुमान उद्देश्यों के लिए निरंतर दूरस्थ डेटा पुनर्प्राप्ति के लिए एक वातावरण का अनुकरण करता है।

def error_callback(exc):
    raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))

def write_to_kafka(topic_name, items):
  count=0
  producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
  for message, key in items:
    producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)
    count+=1
  producer.flush()
  print("Wrote {0} messages into topic: {1}".format(count, topic_name))

write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))
Wrote 60000 messages into topic: susy-train
Wrote 40000 messages into topic: susy-test

tfio ट्रेन डेटासेट को परिभाषित करें

IODataset वर्ग tensorflow में काफ्का से डेटा स्ट्रीमिंग के लिए उपयोग किया जाता है। से वर्ग inherits tf.data.Dataset है और इस तरह के सभी उपयोगी कार्यक्षमताओं है tf.data.Dataset बॉक्स से बाहर।

def decode_kafka_item(item):
  message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(item.key)
  return (message, key)

BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64
train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)

मॉडल बनाएं और प्रशिक्षित करें

# Set the parameters

OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10
# design/build the model
model = tf.keras.Sequential([
  tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(256, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(1, activation='sigmoid')
])

print(model.summary())
Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
dense (Dense)                (None, 128)               2432      
_________________________________________________________________
dropout (Dropout)            (None, 128)               0         
_________________________________________________________________
dense_1 (Dense)              (None, 256)               33024     
_________________________________________________________________
dropout_1 (Dropout)          (None, 256)               0         
_________________________________________________________________
dense_2 (Dense)              (None, 128)               32896     
_________________________________________________________________
dropout_2 (Dropout)          (None, 128)               0         
_________________________________________________________________
dense_3 (Dense)              (None, 1)                 129       
=================================================================
Total params: 68,481
Trainable params: 68,481
Non-trainable params: 0
_________________________________________________________________
None
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model
model.fit(train_ds, epochs=EPOCHS)
Epoch 1/10
938/938 [==============================] - 31s 32ms/step - loss: 0.5218 - accuracy: 0.7379
Epoch 2/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4578 - accuracy: 0.7858
Epoch 3/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4516 - accuracy: 0.7906
Epoch 4/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4492 - accuracy: 0.7908
Epoch 5/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4458 - accuracy: 0.7944
Epoch 6/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4466 - accuracy: 0.7947
Epoch 7/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4439 - accuracy: 0.7939
Epoch 8/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4442 - accuracy: 0.7941
Epoch 9/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4421 - accuracy: 0.7951
Epoch 10/10
938/938 [==============================] - 30s 32ms/step - loss: 0.4423 - accuracy: 0.7962
<tensorflow.python.keras.callbacks.History at 0x7fe8b8405b00>

चूंकि डेटासेट के केवल एक अंश का उपयोग किया जा रहा है, इसलिए प्रशिक्षण चरण के दौरान हमारी सटीकता ~78% तक सीमित है। हालांकि, कृपया बेहतर मॉडल प्रदर्शन के लिए काफ्का में अतिरिक्त डेटा स्टोर करने के लिए स्वतंत्र महसूस करें। इसके अलावा, चूंकि लक्ष्य केवल tfio kafka डेटासेट की कार्यक्षमता को प्रदर्शित करना था, इसलिए एक छोटे और कम जटिल तंत्रिका नेटवर्क का उपयोग किया गया था। हालांकि, कोई भी मॉडल की जटिलता को बढ़ा सकता है, सीखने की रणनीति को संशोधित कर सकता है, खोज उद्देश्यों के लिए हाइपर-पैरामीटर को ट्यून कर सकता है। एक आधारभूत दृष्टिकोण के लिए, यह देखें लेख

परीक्षण डेटा पर अनुमान लगाएं

गलती सहिष्णुता के साथ 'वास्तव में एक बार' अर्थ विज्ञान का पालन करके परीक्षण डेटा पर अनुमान लगाने के लिए, streaming.KafkaGroupIODataset उपयोग किया जा सकता।

tfio परीक्षण डेटासेट को परिभाषित करें

stream_timeout नए डेटा बिंदुओं के लिए दिया अवधि के लिए पैरामीटर ब्लॉक विषय में स्ट्रीम किया है। यह नए डेटासेट बनाने की आवश्यकता को हटा देता है यदि डेटा को रुक-रुक कर विषय में स्ट्रीम किया जा रहा है।

test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=["susy-test"],
    group_id="testcg",
    servers="127.0.0.1:9092",
    stream_timeout=10000,
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

def decode_kafka_test_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(BATCH_SIZE)

यद्यपि इस वर्ग का उपयोग प्रशिक्षण उद्देश्यों के लिए किया जा सकता है, फिर भी कुछ चेतावनी हैं जिन्हें संबोधित करने की आवश्यकता है। एक बार सभी संदेशों काफ्का से पढ़ा जाता है और नवीनतम ऑफसेट का उपयोग कर के लिए प्रतिबद्ध हैं streaming.KafkaGroupIODataset , उपभोक्ता शुरू से संदेशों को पढ़ते समय पुनः आरंभ नहीं करता है। इस प्रकार, प्रशिक्षण के दौरान, केवल एक युग के लिए प्रशिक्षित करना संभव है, जिसमें डेटा लगातार प्रवाहित होता है। इस तरह की कार्यक्षमता में प्रशिक्षण चरण के दौरान सीमित उपयोग के मामले होते हैं, जिसमें एक बार मॉडल द्वारा डेटापॉइंट का उपभोग करने के बाद यह अब नहीं रह जाता है आवश्यक है और त्यागा जा सकता है।

हालाँकि, यह कार्यक्षमता तब चमकती है जब यह एक बार के शब्दार्थ के साथ मजबूत अनुमान की बात आती है।

परीक्षण डेटा पर प्रदर्शन का मूल्यांकन करें

res = model.evaluate(test_ds)
print("test loss, test acc:", res)
625/625 [==============================] - 13s 21ms/step - loss: 0.4367 - accuracy: 0.7980
test loss, test acc: [0.4366855025291443, 0.7980499863624573]

चूंकि अनुमान 'बिल्कुल-एक बार' शब्दार्थ पर आधारित है, परीक्षण सेट पर मूल्यांकन केवल एक बार चलाया जा सकता है। परीक्षण डेटा पर निष्कर्ष को फिर से चलाने के लिए, एक नए उपभोक्ता समूह का उपयोग किया जाना चाहिए।

की भरपाई अंतराल ट्रैक testcg उपभोक्ता समूह

./kafka_2.13-2.7.0/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testcg
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
testcg          susy-test       0          21592           21592           0               rdkafka-66bc39ef-397c-4190-a17c-69ab519d4f53 /10.142.0.106   rdkafka
testcg          susy-test       1          18408           18408           0               rdkafka-66bc39ef-397c-4190-a17c-69ab519d4f53 /10.142.0.106   rdkafka

एक बार जब current-offset मैचों log-end-offset सभी विभाजनों के लिए, यह इंगित करता है कि उपभोक्ता (रों) सभी काफ्का विषय से संदेश प्राप्त करते समय पूरा कर लिया है।

ऑनलाइन सीखने

ऑनलाइन मशीन लर्निंग प्रतिमान प्रशिक्षण मशीन लर्निंग मॉडल के पारंपरिक/पारंपरिक तरीके से थोड़ा अलग है। पूर्व मामले में, जैसे ही नए डेटा बिंदु उपलब्ध होते हैं, मॉडल अपने मापदंडों को क्रमिक रूप से सीखना/अपडेट करना जारी रखता है और इस प्रक्रिया के अनिश्चित काल तक जारी रहने की उम्मीद है। यह बाद दृष्टिकोण जहां डाटासेट तय हो गई है के विपरीत है और मॉडल इस पर iterates n समय की संख्या। ऑनलाइन सीखने में, मॉडल द्वारा एक बार उपभोग किया गया डेटा फिर से प्रशिक्षण के लिए उपलब्ध नहीं हो सकता है।

का उपयोग करके streaming.KafkaBatchIODataset , यह अब इस फैशन में मॉडल प्रशिक्षित करने के लिए संभव है। आइए इस कार्यक्षमता को प्रदर्शित करने के लिए हमारे SUSY डेटासेट का उपयोग जारी रखें।

ऑनलाइन सीखने के लिए tfio प्रशिक्षण डेटासेट

streaming.KafkaBatchIODataset के समान है streaming.KafkaGroupIODataset यह के एपीआई में। इसके अतिरिक्त, यह उपयोग करने के लिए सिफारिश की है stream_timeout अवधि जिसके लिए डाटासेट बाहर समय से पहले नए संदेशों के लिए अवरुद्ध कर देगा कॉन्फ़िगर करने के लिए पैरामीटर। नीचे दिए गए उदाहरण में, डाटासेट एक साथ कॉन्फ़िगर किया गया है stream_timeout की 10000 मिलीसेकंड। इसका तात्पर्य यह है कि, विषय के सभी संदेशों के उपभोग के बाद, डेटासेट समय समाप्त होने और काफ्का क्लस्टर से डिस्कनेक्ट होने से पहले अतिरिक्त 10 सेकंड तक प्रतीक्षा करेगा। यदि समय समाप्त होने से पहले विषय में नए संदेश स्ट्रीम किए जाते हैं, तो उन नए उपभोग किए गए डेटा बिंदुओं के लिए डेटा खपत और मॉडल प्रशिक्षण फिर से शुरू हो जाता है। अनिश्चित काल के लिए ब्लॉक करने के लिए, यह करने के लिए सेट -1

online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
    topics=["susy-train"],
    group_id="cgonline",
    servers="127.0.0.1:9092",
    stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1.
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

हर आइटम है कि online_train_ds उत्पन्न करता है एक है tf.data.Dataset अपने आप में। इस प्रकार, सभी मानक परिवर्तन हमेशा की तरह लागू किए जा सकते हैं।

def decode_kafka_online_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

for mini_ds in online_train_ds:
  mini_ds = mini_ds.shuffle(buffer_size=32)
  mini_ds = mini_ds.map(decode_kafka_online_item)
  mini_ds = mini_ds.batch(32)
  model.fit(mini_ds, epochs=3)
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4279 - accuracy: 0.8115
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4130 - accuracy: 0.8213
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.3996 - accuracy: 0.8242
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4646 - accuracy: 0.7920
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4415 - accuracy: 0.8047
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4296 - accuracy: 0.8018
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4410 - accuracy: 0.7969
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4223 - accuracy: 0.8174
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4187 - accuracy: 0.8096
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4679 - accuracy: 0.7734
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4292 - accuracy: 0.7979
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4411 - accuracy: 0.7959
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4750 - accuracy: 0.7695
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4460 - accuracy: 0.7959
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4345 - accuracy: 0.8008
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4282 - accuracy: 0.8154
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4086 - accuracy: 0.8184
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.3942 - accuracy: 0.8291
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4265 - accuracy: 0.7852
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4233 - accuracy: 0.7988
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4037 - accuracy: 0.8164
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4463 - accuracy: 0.7988
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4064 - accuracy: 0.8164
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4093 - accuracy: 0.8115
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4652 - accuracy: 0.7930
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4508 - accuracy: 0.7930
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4296 - accuracy: 0.8018
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4671 - accuracy: 0.7812
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4519 - accuracy: 0.7910
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4431 - accuracy: 0.7930
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4905 - accuracy: 0.7637
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4691 - accuracy: 0.7764
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4518 - accuracy: 0.7803
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4379 - accuracy: 0.7930
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4314 - accuracy: 0.8057
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4232 - accuracy: 0.7988
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4892 - accuracy: 0.7539
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4660 - accuracy: 0.7773
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4572 - accuracy: 0.7842
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4474 - accuracy: 0.7871
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4435 - accuracy: 0.7920
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4247 - accuracy: 0.8047
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4306 - accuracy: 0.8115
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4219 - accuracy: 0.8164
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4017 - accuracy: 0.8223
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4662 - accuracy: 0.7881
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4563 - accuracy: 0.7988
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4445 - accuracy: 0.7930
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4437 - accuracy: 0.7910
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4282 - accuracy: 0.7988
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4203 - accuracy: 0.8057
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4227 - accuracy: 0.7998
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4005 - accuracy: 0.8223
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.3905 - accuracy: 0.8184
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4617 - accuracy: 0.7871
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4414 - accuracy: 0.7891
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4320 - accuracy: 0.8018
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4493 - accuracy: 0.7900
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4418 - accuracy: 0.8037
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4265 - accuracy: 0.8096
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4740 - accuracy: 0.7783
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4498 - accuracy: 0.7812
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4473 - accuracy: 0.7910
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4439 - accuracy: 0.7900
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4217 - accuracy: 0.8047
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4126 - accuracy: 0.8135
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4417 - accuracy: 0.7998
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4120 - accuracy: 0.8105
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.3967 - accuracy: 0.8213
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4657 - accuracy: 0.7822
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4437 - accuracy: 0.7998
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4344 - accuracy: 0.8057
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4385 - accuracy: 0.8018
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4314 - accuracy: 0.8096
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4186 - accuracy: 0.8086
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4918 - accuracy: 0.7725
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4719 - accuracy: 0.7725
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4586 - accuracy: 0.7920
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4989 - accuracy: 0.7607
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4835 - accuracy: 0.7676
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4796 - accuracy: 0.7695
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4645 - accuracy: 0.7754
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4505 - accuracy: 0.7812
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4420 - accuracy: 0.7852
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4219 - accuracy: 0.8047
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4029 - accuracy: 0.8135
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.3870 - accuracy: 0.8252
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4627 - accuracy: 0.7871
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4313 - accuracy: 0.8066
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4324 - accuracy: 0.8066
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4532 - accuracy: 0.7959
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4370 - accuracy: 0.7871
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4301 - accuracy: 0.8027
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4319 - accuracy: 0.7910
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4268 - accuracy: 0.7979
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4096 - accuracy: 0.8154
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4738 - accuracy: 0.7734
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4411 - accuracy: 0.7842
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4410 - accuracy: 0.7754
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4437 - accuracy: 0.7881
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4273 - accuracy: 0.8047
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4030 - accuracy: 0.8193
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4604 - accuracy: 0.7725
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4432 - accuracy: 0.7744
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4299 - accuracy: 0.7969
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4276 - accuracy: 0.8037
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4138 - accuracy: 0.8076
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4009 - accuracy: 0.8105
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4273 - accuracy: 0.8086
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4049 - accuracy: 0.8193
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.3974 - accuracy: 0.8252
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4755 - accuracy: 0.7705
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4454 - accuracy: 0.7832
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4358 - accuracy: 0.7939
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4691 - accuracy: 0.7686
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4572 - accuracy: 0.7793
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4484 - accuracy: 0.7979
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4292 - accuracy: 0.8066
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4220 - accuracy: 0.8145
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4183 - accuracy: 0.8115
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4420 - accuracy: 0.7930
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4384 - accuracy: 0.7939
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4354 - accuracy: 0.7881
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4731 - accuracy: 0.7666
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4544 - accuracy: 0.7764
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4451 - accuracy: 0.7812
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4211 - accuracy: 0.8047
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4097 - accuracy: 0.8047
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4013 - accuracy: 0.8164
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4625 - accuracy: 0.7832
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4405 - accuracy: 0.7998
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4224 - accuracy: 0.8066
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4404 - accuracy: 0.7881
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4227 - accuracy: 0.7959
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4079 - accuracy: 0.8066
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4570 - accuracy: 0.7910
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4355 - accuracy: 0.8135
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4281 - accuracy: 0.7998
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4721 - accuracy: 0.7891
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4606 - accuracy: 0.7969
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4401 - accuracy: 0.8008
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4666 - accuracy: 0.7842
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4574 - accuracy: 0.7871
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4460 - accuracy: 0.7920
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4538 - accuracy: 0.7969
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4420 - accuracy: 0.8047
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4189 - accuracy: 0.8086
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4766 - accuracy: 0.7627
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4589 - accuracy: 0.7725
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4550 - accuracy: 0.7725
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4338 - accuracy: 0.8018
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4217 - accuracy: 0.8057
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4044 - accuracy: 0.8154
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4523 - accuracy: 0.7881
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4451 - accuracy: 0.7920
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4208 - accuracy: 0.8018
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4711 - accuracy: 0.7666
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4518 - accuracy: 0.7852
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4293 - accuracy: 0.7930
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4786 - accuracy: 0.7783
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4586 - accuracy: 0.7842
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4426 - accuracy: 0.7881
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4392 - accuracy: 0.8115
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4181 - accuracy: 0.8252
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4134 - accuracy: 0.8223
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4380 - accuracy: 0.7998
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4271 - accuracy: 0.8018
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4118 - accuracy: 0.8105
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4469 - accuracy: 0.7939
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4358 - accuracy: 0.7959
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4253 - accuracy: 0.8018
Epoch 1/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4574 - accuracy: 0.8018
Epoch 2/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4531 - accuracy: 0.8018
Epoch 3/3
32/32 [==============================] - 0s 3ms/step - loss: 0.4392 - accuracy: 0.7988
Epoch 1/3
19/19 [==============================] - 0s 3ms/step - loss: 0.4244 - accuracy: 0.8191
Epoch 2/3
19/19 [==============================] - 0s 3ms/step - loss: 0.4049 - accuracy: 0.8339
Epoch 3/3
19/19 [==============================] - 0s 3ms/step - loss: 0.3980 - accuracy: 0.8224

वृद्धिशील रूप से प्रशिक्षित मॉडल को समय-समय पर (उपयोग-मामलों के आधार पर) सहेजा जा सकता है और इसका उपयोग ऑनलाइन या ऑफलाइन मोड में परीक्षण डेटा पर अनुमान लगाने के लिए किया जा सकता है।

सन्दर्भ:

  • बाल्दी, पी., पी. सैडोव्स्की, और डी. व्हाइटसन। "डीप लर्निंग के साथ उच्च-ऊर्जा भौतिकी में विदेशी कणों की खोज।" नेचर कम्युनिकेशंस 5 (2 जुलाई 2014)

  • SUSY डेटासेट: https://archive.ics.uci.edu/ml/datasets/SUSY#