কাফকা এবং টেন্সরফ্লো-আইও ব্যবহার করে স্ট্রিমিং ডেটাতে শক্তিশালী মেশিন লার্নিং

TensorFlow.org-এ দেখুন Google Colab-এ চালান GitHub-এ উৎস দেখুন নোটবুক ডাউনলোড করুন

ওভারভিউ

এই টিউটোরিয়ালটি একটি থেকে তথ্য স্ট্রিমিং উপর দৃষ্টি নিবদ্ধ করে কাফকা একটি মধ্যে ক্লাস্টার tf.data.Dataset যা পরে সাথে ব্যবহার করা হয় tf.keras প্রশিক্ষণ ও অনুমান জন্য।

কাফকা প্রাথমিকভাবে একটি বিতরণ করা ইভেন্ট-স্ট্রিমিং প্ল্যাটফর্ম যা ডেটা পাইপলাইন জুড়ে স্কেলযোগ্য এবং ত্রুটি-সহনশীল স্ট্রিমিং ডেটা সরবরাহ করে। এটি প্রধান উদ্যোগগুলির আধিক্যের একটি অপরিহার্য প্রযুক্তিগত উপাদান যেখানে মিশন-সমালোচনামূলক ডেটা সরবরাহ একটি প্রাথমিক প্রয়োজন।

সেটআপ

প্রয়োজনীয় tensorflow-io এবং kafka প্যাকেজ ইনস্টল করুন

pip install tensorflow-io
pip install kafka-python

প্যাকেজ আমদানি করুন

import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio

tf এবং tfio আমদানি বৈধ করুন

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.23.1
tensorflow version: 2.8.0-rc0

Kafka এবং Zookeeper দৃষ্টান্ত ডাউনলোড এবং সেটআপ করুন

ডেমো উদ্দেশ্যে, নিম্নলিখিত উদাহরণগুলি স্থানীয়ভাবে সেটআপ করা হয়েছে:

  • কাফকা (দালাল: 127.0.0.1:9092)
  • চিড়িয়াখানা (নোড: 127.0.0.1:2181)
curl -sSOL https://downloads.apache.org/kafka/2.7.2/kafka_2.13-2.7.2.tgz
tar -xzf kafka_2.13-2.7.2.tgz

দৃষ্টান্তগুলি ঘোরানোর জন্য ডিফল্ট কনফিগারেশন (অ্যাপাচি কাফকা দ্বারা সরবরাহ করা) ব্যবহার করা।

./kafka_2.13-2.7.2/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-2.7.2/config/zookeeper.properties
./kafka_2.13-2.7.2/bin/kafka-server-start.sh -daemon ./kafka_2.13-2.7.2/config/server.properties
echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
sleep 10
Waiting for 10 secs until kafka and zookeeper services are up and running

একবার দৃষ্টান্ত ডেমন প্রক্রিয়া, জন্য grep হিসাবে শুরু হয় kafka প্রসেস তালিকায়। দুটি জাভা প্রক্রিয়া চিড়িয়াখানা এবং কাফকা দৃষ্টান্তের সাথে মিলে যায়।

ps -ef | grep kafka
kbuilder 27856 20044  4 20:28 ?        00:00:00 python /tmpfs/src/gfile/executor.py --input_notebook=/tmpfs/src/temp/docs/tutorials/kafka.ipynb --timeout=15000
kbuilder 28271     1 16 20:28 ?        00:00:01 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.2/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-api-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-basic-auth-extension-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-file-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-json-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-client-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-runtime-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-transforms-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-client-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-common-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-core-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-hk2-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-server-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-client-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-http-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-io-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-security-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-server-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-clients-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-log4j-appender-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-raft-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-examples-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-scala_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-test-utils-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-tools-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/maven-artifact-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-codec-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-handler-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-jute-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zstd-jni-1.4.5-6.jar org.apache.zookeeper.server.quorum.QuorumPeerMain ./kafka_2.13-2.7.2/config/zookeeper.properties
kbuilder 28635     1 57 20:28 ?        00:00:05 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.2/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-api-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-basic-auth-extension-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-file-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-json-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-client-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-runtime-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-transforms-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-client-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-common-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-core-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-hk2-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-server-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-client-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-http-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-io-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-security-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-server-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-clients-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-log4j-appender-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-raft-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-examples-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-scala_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-test-utils-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-tools-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/maven-artifact-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-codec-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-handler-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-jute-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zstd-jni-1.4.5-6.jar kafka.Kafka ./kafka_2.13-2.7.2/config/server.properties
kbuilder 28821 27860  0 20:28 pts/0    00:00:00 /bin/bash -c ps -ef | grep kafka
kbuilder 28823 28821  0 20:28 pts/0    00:00:00 grep kafka

নিম্নলিখিত চশমাগুলির সাথে কাফকা বিষয়গুলি তৈরি করুন:

  • susy-train: partitions=1, replication-factor=1
  • susy-পরীক্ষা: পার্টিশন=2, প্রতিলিপি-ফ্যাক্টর=1
./kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic susy-train
./kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic susy-test
Created topic susy-train.
Created topic susy-test.

কনফিগারেশনের বিশদ বিবরণের জন্য বিষয়টি বর্ণনা করুন

./kafka_2.13-2.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-train
./kafka_2.13-2.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-test
Topic: susy-train PartitionCount: 1 ReplicationFactor: 1  Configs: segment.bytes=1073741824
    Topic: susy-train Partition: 0  Leader: 0 Replicas: 0   Isr: 0
Topic: susy-test  PartitionCount: 2 ReplicationFactor: 1  Configs: segment.bytes=1073741824
    Topic: susy-test  Partition: 0  Leader: 0 Replicas: 0   Isr: 0
    Topic: susy-test  Partition: 1  Leader: 0 Replicas: 0   Isr: 0

প্রতিলিপি ফ্যাক্টর 1 নির্দেশ করে যে ডেটা প্রতিলিপি করা হচ্ছে না। এটি আমাদের কাফকা সেটআপে একক ব্রোকারের উপস্থিতির কারণে। উৎপাদন ব্যবস্থায়, বুটস্ট্র্যাপ সার্ভারের সংখ্যা 100 এর নোডের মধ্যে হতে পারে। সেখানেই প্রতিলিপি ব্যবহার করে দোষ-সহনশীলতা ছবিতে আসে।

পড়ুন দয়া করে দস্তাবেজ আরো বিস্তারিত জানার জন্য।

SUSY ডেটাসেট

কাফকা একটি ইভেন্ট স্ট্রিমিং প্ল্যাটফর্ম, এটিতে বিভিন্ন উত্স থেকে ডেটা লিখতে সক্ষম করে। এই ক্ষেত্রে:

  • ওয়েব ট্রাফিক লগ
  • জ্যোতির্বিজ্ঞানের পরিমাপ
  • আইওটি সেন্সর ডেটা
  • পণ্য পর্যালোচনা এবং আরো অনেক.

এই টিউটোরিয়ালের উদ্দেশ্যে, ডাউনলোড করতে দেয় SUSY ডেটা সেটটি এবং ম্যানুয়ালি কাফকা ডেটা ভোজন। এই শ্রেণিবিন্যাস সমস্যার লক্ষ্য হল একটি সংকেত প্রক্রিয়ার মধ্যে পার্থক্য করা যা সুপারসিমেট্রিক কণা তৈরি করে এবং একটি পটভূমি প্রক্রিয়া যা করে না।

curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz

ডেটাসেট অন্বেষণ করুন

প্রথম কলামটি হল ক্লাস লেবেল (সংকেতের জন্য 1, পটভূমির জন্য 0), তারপরে 18টি বৈশিষ্ট্য (8 নিম্ন-স্তরের বৈশিষ্ট্য তারপর 10টি উচ্চ-স্তরের বৈশিষ্ট্য)। প্রথম 8টি বৈশিষ্ট্য হল গতিগত বৈশিষ্ট্য যা অ্যাক্সিলারেটরের কণা ডিটেক্টর দ্বারা পরিমাপ করা হয়। শেষ 10টি বৈশিষ্ট্য হল প্রথম 8টি বৈশিষ্ট্যের ফাংশন। এই দুটি শ্রেণীর মধ্যে বৈষম্য করতে সাহায্য করার জন্য পদার্থবিদদের দ্বারা প্রাপ্ত উচ্চ-স্তরের বৈশিষ্ট্য।

COLUMNS = [
          #  labels
           'class',
          #  low-level features
           'lepton_1_pT',
           'lepton_1_eta',
           'lepton_1_phi',
           'lepton_2_pT',
           'lepton_2_eta',
           'lepton_2_phi',
           'missing_energy_magnitude',
           'missing_energy_phi',
          #  high-level derived features
           'MET_rel',
           'axial_MET',
           'M_R',
           'M_TR_2',
           'R',
           'MT2',
           'S_R',
           'M_Delta_R',
           'dPhi_r_b',
           'cos(theta_r1)'
           ]

সম্পূর্ণ ডেটাসেট 5 মিলিয়ন সারি নিয়ে গঠিত। যাইহোক, এই টিউটোরিয়ালের উদ্দেশ্যে, আসুন শুধুমাত্র ডেটাসেটের একটি ভগ্নাংশ (100,000 সারি) বিবেচনা করি যাতে ডেটা সরাতে কম সময় ব্যয় হয় এবং api-এর কার্যকারিতা বোঝার জন্য আরও বেশি সময় ব্যয় হয়।

susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
susy_df = next(susy_iterator)
susy_df.head()
# Number of datapoints and columns
len(susy_df), len(susy_df.columns)
(100000, 19)
# Number of datapoints belonging to each class (0: background noise, 1: signal)
len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])
(54025, 45975)

ডেটাসেট বিভক্ত করুন

train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))

x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]

x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]

# The labels are set as the kafka message keys so as to store data
# in multiple-partitions. Thus, enabling efficient data retrieval
# using the consumer groups.
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))

x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
Number of training samples:  60000
Number of testing sample:  40000
NUM_COLUMNS = len(x_train_df.columns)
len(x_train), len(y_train), len(x_test), len(y_test)
(60000, 60000, 40000, 40000)

কাফকাতে ট্রেন এবং পরীক্ষার ডেটা সংরক্ষণ করুন

কাফকাতে ডেটা সংরক্ষণ করা প্রশিক্ষণ এবং অনুমানের উদ্দেশ্যে ক্রমাগত দূরবর্তী ডেটা পুনরুদ্ধারের জন্য একটি পরিবেশকে অনুকরণ করে।

def error_callback(exc):
    raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))

def write_to_kafka(topic_name, items):
  count=0
  producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
  for message, key in items:
    producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)
    count+=1
  producer.flush()
  print("Wrote {0} messages into topic: {1}".format(count, topic_name))

write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))
Wrote 60000 messages into topic: susy-train
Wrote 40000 messages into topic: susy-test

tfio ট্রেন ডেটাসেট সংজ্ঞায়িত করুন

IODataset বর্গ tensorflow মধ্যে কাফকা থেকে তথ্য স্ট্রিমিং করার জন্য ব্যবহৃত হয়। থেকে বর্গ উত্তরাধিকারী tf.data.Dataset এবং এইভাবে সব দরকারী বৈশিষ্ট্য রয়েছে tf.data.Dataset বাক্সের বাইরে।

def decode_kafka_item(item):
  message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(item.key)
  return (message, key)

BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64
train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)
2022-01-07 20:29:21.602817: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

মডেল তৈরি এবং প্রশিক্ষণ

# Set the parameters

OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10
# design/build the model
model = tf.keras.Sequential([
  tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(256, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(1, activation='sigmoid')
])

print(model.summary())
Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 dense (Dense)               (None, 128)               2432      
                                                                 
 dropout (Dropout)           (None, 128)               0         
                                                                 
 dense_1 (Dense)             (None, 256)               33024     
                                                                 
 dropout_1 (Dropout)         (None, 256)               0         
                                                                 
 dense_2 (Dense)             (None, 128)               32896     
                                                                 
 dropout_2 (Dropout)         (None, 128)               0         
                                                                 
 dense_3 (Dense)             (None, 1)                 129       
                                                                 
=================================================================
Total params: 68,481
Trainable params: 68,481
Non-trainable params: 0
_________________________________________________________________
None
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model
model.fit(train_ds, epochs=EPOCHS)
Epoch 1/10
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/util/dispatch.py:1082: UserWarning: "`binary_crossentropy` received `from_logits=True`, but the `output` argument was produced by a sigmoid or softmax activation and thus does not represent logits. Was this intended?"
  return dispatch_target(*args, **kwargs)
938/938 [==============================] - 31s 33ms/step - loss: 0.4817 - accuracy: 0.7691
Epoch 2/10
938/938 [==============================] - 30s 32ms/step - loss: 0.4550 - accuracy: 0.7875
Epoch 3/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4512 - accuracy: 0.7911
Epoch 4/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4487 - accuracy: 0.7940
Epoch 5/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4466 - accuracy: 0.7934
Epoch 6/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4459 - accuracy: 0.7933
Epoch 7/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4448 - accuracy: 0.7935
Epoch 8/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4439 - accuracy: 0.7950
Epoch 9/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4421 - accuracy: 0.7956
Epoch 10/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4425 - accuracy: 0.7962
<keras.callbacks.History at 0x7fb364fd2a90>

যেহেতু ডেটাসেটের শুধুমাত্র একটি ভগ্নাংশ ব্যবহার করা হচ্ছে, তাই প্রশিক্ষণ পর্বের সময় আমাদের নির্ভুলতা ~78% পর্যন্ত সীমাবদ্ধ। যাইহোক, একটি ভাল মডেল পারফরম্যান্সের জন্য অনুগ্রহ করে কাফকাতে অতিরিক্ত ডেটা সঞ্চয় করুন। এছাড়াও, যেহেতু লক্ষ্য টিফিও কাফকা ডেটাসেটের কার্যকারিতা প্রদর্শন করা ছিল, তাই একটি ছোট এবং কম-জটিল নিউরাল নেটওয়ার্ক ব্যবহার করা হয়েছিল। যাইহোক, কেউ মডেলের জটিলতা বাড়াতে পারে, শেখার কৌশল পরিবর্তন করতে পারে, অন্বেষণের উদ্দেশ্যে হাইপার-প্যারামিটার ইত্যাদি টিউন করতে পারে। একটি বেসলাইন পদ্ধতির জন্য, এই পড়ুন দয়া নিবন্ধ

পরীক্ষার তথ্য অনুমান করুন

দোষ সহনশীলতা সহ 'ঠিক-একবার' শব্দার্থবিদ্যা লগ্ন দ্বারা পরীক্ষা ডেটার উপর অনুমান করার জন্য, streaming.KafkaGroupIODataset কাজে লাগানো যায়।

tfio পরীক্ষার ডেটাসেট সংজ্ঞায়িত করুন

stream_timeout নতুন ডাটা পয়েন্টের জন্য দেওয়া চলাকালীন প্যারামিটার ব্লক বিষয় মধ্যে প্রবাহিত হবে। এটি নতুন ডেটাসেট তৈরির প্রয়োজনীয়তাকে সরিয়ে দেয় যদি তথ্যটি একটি অন্তর্বর্তী ফ্যাশনে বিষয়টিতে প্রবাহিত হয়।

test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=["susy-test"],
    group_id="testcg",
    servers="127.0.0.1:9092",
    stream_timeout=10000,
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

def decode_kafka_test_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(BATCH_SIZE)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_io/python/experimental/kafka_group_io_dataset_ops.py:188: take_while (from tensorflow.python.data.experimental.ops.take_while_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Dataset.take_while(...)

যদিও এই শ্রেণীটি প্রশিক্ষণের উদ্দেশ্যে ব্যবহার করা যেতে পারে, তবে কিছু সতর্কতা রয়েছে যা সমাধান করা প্রয়োজন। একবার সব বার্তা কাফকা থেকে পড়া হয় এবং সর্বশেষ অফসেট ব্যবহার প্রতিশ্রুতিবদ্ধ হয় streaming.KafkaGroupIODataset , ভোক্তা শুরু থেকে বার্তা পড়া পুনরায় আরম্ভ করা হয় না। এইভাবে, প্রশিক্ষণের সময়, অবিচ্ছিন্নভাবে ডেটা প্রবাহিত করে শুধুমাত্র একটি একক যুগের জন্য প্রশিক্ষণ দেওয়া সম্ভব। প্রশিক্ষণ পর্বে এই ধরনের কার্যকারিতার সীমিত ব্যবহারের ক্ষেত্রে থাকে যেখানে, একবার একটি ডেটাপয়েন্ট মডেল দ্বারা গ্রাস করা হলে তা আর থাকে না। প্রয়োজন এবং বাতিল করা যেতে পারে।

যাইহোক, এই কার্যকারিতা উজ্জ্বল হয় যখন এটি ঠিক-একবার শব্দার্থবিদ্যার সাথে শক্তিশালী অনুমানের ক্ষেত্রে আসে।

পরীক্ষার ডেটাতে কর্মক্ষমতা মূল্যায়ন করুন

res = model.evaluate(test_ds)
print("test loss, test acc:", res)
34/Unknown - 0s 2ms/step - loss: 0.4434 - accuracy: 0.8194
2022-01-07 20:34:29.402707: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions
2022-01-07 20:34:29.406789: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0
625/625 [==============================] - 11s 17ms/step - loss: 0.4437 - accuracy: 0.7915
test loss, test acc: [0.4436523914337158, 0.7915250062942505]
2022-01-07 20:34:40.051954: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out

যেহেতু অনুমানটি 'ঠিক-একবার' শব্দার্থবিদ্যার উপর ভিত্তি করে, তাই পরীক্ষার সেটে মূল্যায়ন শুধুমাত্র একবার চালানো যেতে পারে। পরীক্ষার ডেটাতে আবার অনুমান চালানোর জন্য, একটি নতুন ভোক্তা গোষ্ঠী ব্যবহার করা উচিত।

অফসেট ল্যাগ ট্র্যাক testcg ভোক্তা গ্রুপ

./kafka_2.13-2.7.2/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testcg
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
testcg          susy-test       0          21626           21626           0               rdkafka-534f63d0-b91e-4976-a3ca-832b6c91210e /10.142.0.103   rdkafka
testcg          susy-test       1          18374           18374           0               rdkafka-534f63d0-b91e-4976-a3ca-832b6c91210e /10.142.0.103   rdkafka

একবার current-offset ম্যাচ log-end-offset সমস্ত পার্টিশন জন্য, এটি নির্দেশ করে যে ভোক্তা (গুলি) সব কাফকা বিষয় থেকে বার্তা আনয়ন সম্পন্ন করেছেন।

অনলাইন শিক্ষা

অনলাইন মেশিন লার্নিং দৃষ্টান্তটি মেশিন লার্নিং মডেল প্রশিক্ষণের প্রথাগত/প্রচলিত পদ্ধতি থেকে একটু ভিন্ন। প্রাক্তন ক্ষেত্রে, নতুন ডেটা পয়েন্ট উপলব্ধ হওয়ার সাথে সাথে মডেলটি ক্রমবর্ধমানভাবে এর পরামিতিগুলি শিখতে/আপডেট করতে থাকে এবং এই প্রক্রিয়াটি অনির্দিষ্টকালের জন্য অব্যাহত থাকবে বলে আশা করা হচ্ছে। এই আধুনিক পন্থা যেখানে ডেটা সেটটি সংশোধন করা হয়েছে অসদৃশ হয় এবং মডেল এটি উপর iterates n যতবার। অনলাইন শেখার ক্ষেত্রে, মডেল দ্বারা একবার ব্যবহার করা ডেটা আবার প্রশিক্ষণের জন্য উপলব্ধ নাও হতে পারে।

ব্যবহার দ্বারা streaming.KafkaBatchIODataset , এটি এখন এই ফ্যাশন মডেলগুলির প্রশিক্ষণ করা সম্ভব। আসুন এই কার্যকারিতা প্রদর্শনের জন্য আমাদের SUSY ডেটাসেট ব্যবহার করা চালিয়ে যাই।

অনলাইন শিক্ষার জন্য tfio প্রশিক্ষণ ডেটাসেট

streaming.KafkaBatchIODataset অনুরূপ streaming.KafkaGroupIODataset এটা API এ। উপরন্তু, এটা ব্যবহার করতে সুপারিশ করা হয় stream_timeout সময়কাল, যার জন্য ডেটা সেটটি বাইরে সময়জ্ঞান সামনে নতুন বার্তার জন্য অবরুদ্ধ করবে কনফিগার করতে প্যারামিটার। নিচে ইনস্ট্যান্সের মধ্যে, ডেটা সেটটি একটি মাধ্যমে কনফিগার করা হয়েছে stream_timeout এর 10000 মিলিসেকেন্ড। এটি বোঝায় যে, বিষয়ের সমস্ত বার্তা গ্রহণ করার পরে, ডেটাসেটটি কাফকা ক্লাস্টার থেকে টাইম আউট এবং সংযোগ বিচ্ছিন্ন করার আগে অতিরিক্ত 10 সেকেন্ডের জন্য অপেক্ষা করবে। সময় শেষ হওয়ার আগে যদি নতুন বার্তাগুলি বিষয়ের মধ্যে স্ট্রীম করা হয়, তবে সেই নতুনভাবে ব্যবহৃত ডেটা পয়েন্টগুলির জন্য ডেটা খরচ এবং মডেল প্রশিক্ষণ পুনরায় শুরু হয়। অনির্দিষ্টকালের জন্য অবরুদ্ধ করতে চাইলে, এটি সেট -1

online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
    topics=["susy-train"],
    group_id="cgonline",
    servers="127.0.0.1:9092",
    stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1.
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

প্রতিটি আইটেম যে online_train_ds উত্পন্ন একটি হল tf.data.Dataset নিজেই। এইভাবে, সমস্ত আদর্শ রূপান্তর স্বাভাবিক হিসাবে প্রয়োগ করা যেতে পারে।

def decode_kafka_online_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

for mini_ds in online_train_ds:
  mini_ds = mini_ds.shuffle(buffer_size=32)
  mini_ds = mini_ds.map(decode_kafka_online_item)
  mini_ds = mini_ds.batch(32)
  if len(mini_ds) > 0:
    model.fit(mini_ds, epochs=3)
2022-01-07 20:34:42.024915: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions
2022-01-07 20:34:42.025797: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4561 - accuracy: 0.7909
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4538 - accuracy: 0.7909
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4499 - accuracy: 0.7947
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4347 - accuracy: 0.8018
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4314 - accuracy: 0.8048
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4286 - accuracy: 0.8063
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4480 - accuracy: 0.7910
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4425 - accuracy: 0.7945
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4390 - accuracy: 0.7970
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4434 - accuracy: 0.7965
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4380 - accuracy: 0.7974
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4354 - accuracy: 0.7992
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4522 - accuracy: 0.7909
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4475 - accuracy: 0.7910
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4435 - accuracy: 0.7947
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4464 - accuracy: 0.7906
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4467 - accuracy: 0.7922
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4424 - accuracy: 0.7933
2022-01-07 20:35:04.916208: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out

ক্রমবর্ধমানভাবে প্রশিক্ষিত মডেলটি পর্যায়ক্রমিক ফ্যাশনে সংরক্ষণ করা যেতে পারে (ব্যবহারের ক্ষেত্রের উপর ভিত্তি করে) এবং অনলাইন বা অফলাইন মোডে পরীক্ষার ডেটা অনুমান করতে ব্যবহার করা যেতে পারে।

তথ্যসূত্র:

  • বাল্ডি, পি., পি. সাডোস্কি এবং ডি. হোয়াইটসন। "গভীর শিক্ষার সাথে উচ্চ-শক্তি পদার্থবিদ্যায় বহিরাগত কণাগুলির জন্য অনুসন্ধান করা।" প্রকৃতি যোগাযোগ 5 (জুলাই 2, 2014)

  • SUSY ডেটা সেটটি: https://archive.ics.uci.edu/ml/datasets/SUSY#