Kafka ve Tensorflow-IO kullanarak veri akışında güçlü makine öğrenimi

TensorFlow.org'da görüntüleyin Google Colab'da çalıştırın Kaynağı GitHub'da görüntüleyin Not defterini indir

genel bakış

Bu öğretici bir veri akışı üzerinde duruluyor Kafka bir içine küme tf.data.Dataset sonra birlikte kullanıldığında tf.keras eğitim ve çıkarım için.

Kafka, öncelikle veri boru hatları arasında ölçeklenebilir ve hataya dayanıklı akış verileri sağlayan dağıtılmış bir olay akışı platformudur. Görev açısından kritik veri tesliminin birincil gereksinim olduğu çok sayıda büyük kuruluşun önemli bir teknik bileşenidir.

Kurmak

Gerekli tensorflow-io ve kafka paketlerini kurun

pip install tensorflow-io
pip install kafka-python

Paketleri içe aktar

import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio

tf ve tfio içe aktarmalarını doğrulama

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.23.1
tensorflow version: 2.8.0-rc0

Kafka ve Zookeeper örneklerini indirin ve kurun

Demo amacıyla, aşağıdaki örnekler yerel olarak kurulur:

  • Kafka (Komisyoncular: 127.0.0.1:9092)
  • Hayvan Bekçisi (Düğüm: 127.0.0.1:2181)
curl -sSOL https://downloads.apache.org/kafka/2.7.2/kafka_2.13-2.7.2.tgz
tar -xzf kafka_2.13-2.7.2.tgz

Örnekleri döndürmek için varsayılan yapılandırmaları (Apache Kafka tarafından sağlanır) kullanma.

./kafka_2.13-2.7.2/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-2.7.2/config/zookeeper.properties
./kafka_2.13-2.7.2/bin/kafka-server-start.sh -daemon ./kafka_2.13-2.7.2/config/server.properties
echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
sleep 10
Waiting for 10 secs until kafka and zookeeper services are up and running

Örneklerini Daemon süreçler için grep olarak başlayan sonra kafka süreçleri listesinde. İki Java işlemi, zookeeper ve kafka örneklerine karşılık gelir.

ps -ef | grep kafka
kbuilder 27856 20044  4 20:28 ?        00:00:00 python /tmpfs/src/gfile/executor.py --input_notebook=/tmpfs/src/temp/docs/tutorials/kafka.ipynb --timeout=15000
kbuilder 28271     1 16 20:28 ?        00:00:01 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.2/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-api-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-basic-auth-extension-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-file-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-json-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-client-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-runtime-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-transforms-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-client-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-common-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-core-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-hk2-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-server-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-client-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-http-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-io-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-security-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-server-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-clients-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-log4j-appender-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-raft-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-examples-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-scala_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-test-utils-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-tools-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/maven-artifact-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-codec-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-handler-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-jute-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zstd-jni-1.4.5-6.jar org.apache.zookeeper.server.quorum.QuorumPeerMain ./kafka_2.13-2.7.2/config/zookeeper.properties
kbuilder 28635     1 57 20:28 ?        00:00:05 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.2/bin/../config/log4j.properties -cp /tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/activation-1.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/argparse4j-0.7.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/audience-annotations-0.5.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-cli-1.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/commons-lang3-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-api-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-basic-auth-extension-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-file-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-json-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-mirror-client-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-runtime-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/connect-transforms-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-api-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-locator-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/hk2-utils-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-core-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-databind-2.10.5.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.inject-2.6.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.25.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javassist-3.26.0-GA.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.servlet-api-3.1.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jaxb-api-2.3.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-client-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-common-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-container-servlet-core-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-hk2-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jersey-server-2.34.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-client-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-http-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-io-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-security-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-server-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/jopt-simple-5.0.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-clients-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-log4j-appender-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-raft-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-examples-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-scala_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-streams-test-utils-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka-tools-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2-sources.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/kafka_2.13-2.7.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/log4j-1.2.17.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/lz4-java-1.7.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/maven-artifact-3.8.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/metrics-core-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-codec-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-handler-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/paranamer-2.8.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/plexus-utils-3.2.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/reflections-0.9.12.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/rocksdbjni-5.18.4.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-library-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-logging_2.13-3.9.2.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/scala-reflect-2.13.3.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-api-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/snappy-java-1.1.7.7.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zookeeper-jute-3.5.9.jar:/tmpfs/src/temp/docs/tutorials/kafka_2.13-2.7.2/bin/../libs/zstd-jni-1.4.5-6.jar kafka.Kafka ./kafka_2.13-2.7.2/config/server.properties
kbuilder 28821 27860  0 20:28 pts/0    00:00:00 /bin/bash -c ps -ef | grep kafka
kbuilder 28823 28821  0 20:28 pts/0    00:00:00 grep kafka

Aşağıdaki özelliklere sahip kafka konularını oluşturun:

  • susy-tren: bölümler=1, çoğaltma faktörü=1
  • susy testi: bölümler=2, çoğaltma faktörü=1
./kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic susy-train
./kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic susy-test
Created topic susy-train.
Created topic susy-test.

Yapılandırmayla ilgili ayrıntılar için konuyu açıklayın

./kafka_2.13-2.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-train
./kafka_2.13-2.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-test
Topic: susy-train PartitionCount: 1 ReplicationFactor: 1  Configs: segment.bytes=1073741824
    Topic: susy-train Partition: 0  Leader: 0 Replicas: 0   Isr: 0
Topic: susy-test  PartitionCount: 2 ReplicationFactor: 1  Configs: segment.bytes=1073741824
    Topic: susy-test  Partition: 0  Leader: 0 Replicas: 0   Isr: 0
    Topic: susy-test  Partition: 1  Leader: 0 Replicas: 0   Isr: 0

Çoğaltma faktörü 1, verilerin çoğaltılmadığını gösterir. Bunun nedeni, kafka kurulumumuzda tek bir komisyoncu bulunmasıdır. Üretim sistemlerinde, önyükleme sunucularının sayısı 100'lerce düğüm aralığında olabilir. Çoğaltma kullanan hata toleransının resme geldiği yer burasıdır.

Bakınız docs Daha fazla ayrıntı için.

SUSY Veri Kümesi

Bir olay akış platformu olan Kafka, çeşitli kaynaklardan gelen verilerin içine yazılmasını sağlar. Örneğin:

  • Web trafiği günlükleri
  • astronomik ölçümler
  • IoT sensör verileri
  • Ürün incelemeleri ve daha fazlası.

Bu eğitimde amacına uygun olarak, indirmenizi sağlar SUSY veri kümesini ve elle kafka içine veri beslenirler. Bu sınıflandırma probleminin amacı, süpersimetrik parçacıklar üreten bir sinyal süreci ile üretmeyen bir arka plan süreci arasında ayrım yapmaktır.

curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz

Veri kümesini keşfedin

İlk sütun sınıf etiketidir (sinyal için 1, arka plan için 0), ardından 18 özellik (8 düşük seviye özellik, sonra 10 yüksek seviye özellik). İlk 8 özellik, hızlandırıcıdaki parçacık dedektörleri tarafından ölçülen kinematik özelliklerdir. Son 10 özellik, ilk 8 özelliğin işlevleridir. Bunlar, iki sınıf arasında ayrım yapmaya yardımcı olmak için fizikçiler tarafından türetilen üst düzey özelliklerdir.

COLUMNS = [
          #  labels
           'class',
          #  low-level features
           'lepton_1_pT',
           'lepton_1_eta',
           'lepton_1_phi',
           'lepton_2_pT',
           'lepton_2_eta',
           'lepton_2_phi',
           'missing_energy_magnitude',
           'missing_energy_phi',
          #  high-level derived features
           'MET_rel',
           'axial_MET',
           'M_R',
           'M_TR_2',
           'R',
           'MT2',
           'S_R',
           'M_Delta_R',
           'dPhi_r_b',
           'cos(theta_r1)'
           ]

Tüm veri kümesi 5 milyon satırdan oluşur. Bununla birlikte, bu öğreticinin amacı için, veri kümesinin yalnızca bir kısmını (100.000 satır) ele alalım, böylece verileri taşımak için daha az zaman harcanır ve api'nin işlevselliğini anlamak için daha fazla zaman harcanır.

susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
susy_df = next(susy_iterator)
susy_df.head()
# Number of datapoints and columns
len(susy_df), len(susy_df.columns)
(100000, 19)
# Number of datapoints belonging to each class (0: background noise, 1: signal)
len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])
(54025, 45975)

Veri kümesini böl

train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))

x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]

x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]

# The labels are set as the kafka message keys so as to store data
# in multiple-partitions. Thus, enabling efficient data retrieval
# using the consumer groups.
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))

x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
Number of training samples:  60000
Number of testing sample:  40000
NUM_COLUMNS = len(x_train_df.columns)
len(x_train), len(y_train), len(x_test), len(y_test)
(60000, 60000, 40000, 40000)

Tren ve test verilerini kafka'da saklayın

Verileri kafka'da saklamak, eğitim ve çıkarım amacıyla sürekli uzaktan veri alımı için bir ortamı simüle eder.

def error_callback(exc):
    raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))

def write_to_kafka(topic_name, items):
  count=0
  producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
  for message, key in items:
    producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)
    count+=1
  producer.flush()
  print("Wrote {0} messages into topic: {1}".format(count, topic_name))

write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))
Wrote 60000 messages into topic: susy-train
Wrote 40000 messages into topic: susy-test

tfio tren veri kümesini tanımlayın

IODataset sınıfı tensorflow içine kafka veri akışı için kullanılmaktadır. Sınıf devralır tf.data.Dataset ve böylece tüm faydalı fonksiyonları vardır tf.data.Dataset kutudan.

def decode_kafka_item(item):
  message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(item.key)
  return (message, key)

BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64
train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)
2022-01-07 20:29:21.602817: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

Modeli oluşturun ve eğitin

# Set the parameters

OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10
# design/build the model
model = tf.keras.Sequential([
  tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(256, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(1, activation='sigmoid')
])

print(model.summary())
Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 dense (Dense)               (None, 128)               2432      
                                                                 
 dropout (Dropout)           (None, 128)               0         
                                                                 
 dense_1 (Dense)             (None, 256)               33024     
                                                                 
 dropout_1 (Dropout)         (None, 256)               0         
                                                                 
 dense_2 (Dense)             (None, 128)               32896     
                                                                 
 dropout_2 (Dropout)         (None, 128)               0         
                                                                 
 dense_3 (Dense)             (None, 1)                 129       
                                                                 
=================================================================
Total params: 68,481
Trainable params: 68,481
Non-trainable params: 0
_________________________________________________________________
None
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model
model.fit(train_ds, epochs=EPOCHS)
Epoch 1/10
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/util/dispatch.py:1082: UserWarning: "`binary_crossentropy` received `from_logits=True`, but the `output` argument was produced by a sigmoid or softmax activation and thus does not represent logits. Was this intended?"
  return dispatch_target(*args, **kwargs)
938/938 [==============================] - 31s 33ms/step - loss: 0.4817 - accuracy: 0.7691
Epoch 2/10
938/938 [==============================] - 30s 32ms/step - loss: 0.4550 - accuracy: 0.7875
Epoch 3/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4512 - accuracy: 0.7911
Epoch 4/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4487 - accuracy: 0.7940
Epoch 5/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4466 - accuracy: 0.7934
Epoch 6/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4459 - accuracy: 0.7933
Epoch 7/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4448 - accuracy: 0.7935
Epoch 8/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4439 - accuracy: 0.7950
Epoch 9/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4421 - accuracy: 0.7956
Epoch 10/10
938/938 [==============================] - 31s 32ms/step - loss: 0.4425 - accuracy: 0.7962
<keras.callbacks.History at 0x7fb364fd2a90>

Veri kümesinin yalnızca bir kısmı kullanıldığından, eğitim aşamasında doğruluğumuz ~%78 ile sınırlıdır. Ancak, daha iyi bir model performansı için lütfen kafka'da ek veri depolamaktan çekinmeyin. Ayrıca amaç sadece tfio kafka veri setlerinin işlevselliğini göstermek olduğundan, daha küçük ve daha az karmaşık bir sinir ağı kullanıldı. Ancak, modelin karmaşıklığı arttırılabilir, öğrenme stratejisi değiştirilebilir, hiper-parametreler vb. keşif amacıyla ayarlanabilir. Temel bir yaklaşım için, bu bakınız makale .

Test verilerinden çıkarım yapın

Hataya dayanıklılık ile birlikte 'tam olarak tek-' anlam yapışan test verilerine anlaması için streaming.KafkaGroupIODataset kullanılabilir.

tfio test veri kümesini tanımlayın

stream_timeout yeni veri noktaları için verilen süre boyunca parametre blokları konuya akışının gerçekleşmesi. Bu, veriler konuya aralıklı olarak aktarılıyorsa yeni veri kümeleri oluşturma ihtiyacını ortadan kaldırır.

test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=["susy-test"],
    group_id="testcg",
    servers="127.0.0.1:9092",
    stream_timeout=10000,
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

def decode_kafka_test_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(BATCH_SIZE)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_io/python/experimental/kafka_group_io_dataset_ops.py:188: take_while (from tensorflow.python.data.experimental.ops.take_while_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Dataset.take_while(...)

Bu sınıf eğitim amaçlı kullanılabilse de, ele alınması gereken uyarılar vardır. Tüm mesajlar kafka okunur ve en son uzaklıklar kullanılarak işlenen sonra streaming.KafkaGroupIODataset , tüketici baştan ileti okuma yeniden başlatmaz. Böylece, eğitim sırasında, verilerin sürekli olarak içeri aktığı tek bir dönem için eğitim yapmak mümkündür. Bu tür bir işlevsellik, eğitim aşamasında sınırlı kullanım durumlarına sahiptir; burada, model tarafından bir veri noktası tüketildikten sonra artık geçerli değildir. gereklidir ve atılabilir.

Bununla birlikte, bu işlevsellik, tam olarak bir kez anlambilim ile sağlam çıkarım söz konusu olduğunda parlar.

test verileri üzerindeki performansı değerlendirmek

res = model.evaluate(test_ds)
print("test loss, test acc:", res)
34/Unknown - 0s 2ms/step - loss: 0.4434 - accuracy: 0.8194
2022-01-07 20:34:29.402707: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions
2022-01-07 20:34:29.406789: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0
625/625 [==============================] - 11s 17ms/step - loss: 0.4437 - accuracy: 0.7915
test loss, test acc: [0.4436523914337158, 0.7915250062942505]
2022-01-07 20:34:40.051954: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out

Çıkarım 'tam olarak bir kez' anlambilime dayandığından, test kümesindeki değerlendirme yalnızca bir kez çalıştırılabilir. Test verileri üzerinde tekrar çıkarımı yapabilmek için yeni bir tüketici grubu kullanılmalıdır.

Ofset lag takip testcg tüketici grubu

./kafka_2.13-2.7.2/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testcg
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
testcg          susy-test       0          21626           21626           0               rdkafka-534f63d0-b91e-4976-a3ca-832b6c91210e /10.142.0.103   rdkafka
testcg          susy-test       1          18374           18374           0               rdkafka-534f63d0-b91e-4976-a3ca-832b6c91210e /10.142.0.103   rdkafka

Bir kez current-offset maçları log-end-offset tüm bölümler için, tüketici (ler) tüm kafka konudan iletilerin tamamlamış olduğunu gösterir.

Çevrimiçi öğrenme

Çevrimiçi makine öğrenimi paradigması, geleneksel/geleneksel makine öğrenimi modellerini eğitme yolundan biraz farklıdır. İlk durumda, model, yeni veri noktaları mevcut olduğu anda parametrelerini aşamalı olarak öğrenmeye/güncellemeye devam eder ve bu sürecin süresiz olarak devam etmesi beklenir. Bu veri kümesi sabittir ikincisi yaklaşımların aksine ve model üzerinde dolaşır n kaç kez. Çevrimiçi öğrenmede, model tarafından bir kez tüketilen veriler tekrar eğitim için uygun olmayabilir.

Yararlanarak streaming.KafkaBatchIODataset , şekilde modellerini yetiştirmek artık mümkün. Bu işlevselliği göstermek için SUSY veri setimizi kullanmaya devam edelim.

Çevrimiçi öğrenme için tfio eğitim veri seti

streaming.KafkaBatchIODataset benzer streaming.KafkaGroupIODataset 's API. Ayrıca, kullanılması tavsiye edilir stream_timeout veri kümesi zaman aşımına uğramadan önce yeni mesajlar için engeller süreyi yapılandırmak için parametre. Aşağıdaki örnekte, veri kümesi bir ile yapılandırılmış stream_timeout ait 10000 milisaniye. Bu, konudaki tüm mesajlar tüketildikten sonra, veri kümesinin zaman aşımına uğramadan ve kafka kümesinden ayrılmadan önce 10 saniye daha bekleyeceği anlamına gelir. Zaman aşımına uğramadan önce konuya yeni mesajlar aktarılırsa, yeni tüketilen veri noktaları için veri tüketimi ve model eğitimi devam eder. Süresiz engellemek için, ayarlayın -1 .

online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
    topics=["susy-train"],
    group_id="cgonline",
    servers="127.0.0.1:9092",
    stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1.
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

O her öğe online_train_ds üreten bir olduğunu tf.data.Dataset kendi içinde. Böylece, tüm standart dönüşümler her zamanki gibi uygulanabilir.

def decode_kafka_online_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

for mini_ds in online_train_ds:
  mini_ds = mini_ds.shuffle(buffer_size=32)
  mini_ds = mini_ds.map(decode_kafka_online_item)
  mini_ds = mini_ds.batch(32)
  if len(mini_ds) > 0:
    model.fit(mini_ds, epochs=3)
2022-01-07 20:34:42.024915: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions
2022-01-07 20:34:42.025797: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4561 - accuracy: 0.7909
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4538 - accuracy: 0.7909
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4499 - accuracy: 0.7947
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4347 - accuracy: 0.8018
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4314 - accuracy: 0.8048
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4286 - accuracy: 0.8063
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4480 - accuracy: 0.7910
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4425 - accuracy: 0.7945
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4390 - accuracy: 0.7970
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4434 - accuracy: 0.7965
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4380 - accuracy: 0.7974
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4354 - accuracy: 0.7992
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4522 - accuracy: 0.7909
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4475 - accuracy: 0.7910
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4435 - accuracy: 0.7947
Epoch 1/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4464 - accuracy: 0.7906
Epoch 2/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4467 - accuracy: 0.7922
Epoch 3/3
313/313 [==============================] - 1s 2ms/step - loss: 0.4424 - accuracy: 0.7933
2022-01-07 20:35:04.916208: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out

Kademeli olarak eğitilmiş model, periyodik olarak (kullanım durumlarına dayalı olarak) kaydedilebilir ve hem çevrimiçi hem de çevrimdışı modlarda test verilerini çıkarmak için kullanılabilir.

Referanslar: