पायथन में रीयल-टाइम डेटा स्ट्रीमिंग कैसे कार्यान्वित करें

Payathana Mem Riyala Ta Ima Deta Striminga Kaise Karyanvita Karem



पायथन में वास्तविक समय डेटा स्ट्रीमिंग के कार्यान्वयन में महारत हासिल करना आज की डेटा से जुड़ी दुनिया में एक आवश्यक कौशल के रूप में कार्य करता है। यह मार्गदर्शिका पायथन में प्रामाणिकता के साथ वास्तविक समय डेटा स्ट्रीमिंग का उपयोग करने के लिए मुख्य चरणों और आवश्यक उपकरणों की पड़ताल करती है। अपाचे काफ्का या अपाचे पल्सर जैसे उपयुक्त ढांचे का चयन करने से लेकर सहज डेटा खपत, प्रसंस्करण और प्रभावी विज़ुअलाइज़ेशन के लिए पायथन कोड लिखने तक, हम चुस्त और कुशल वास्तविक समय डेटा चैनल बनाने के लिए आवश्यक कौशल हासिल करेंगे।

उदाहरण 1: पायथन में रीयल-टाइम डेटा स्ट्रीमिंग का कार्यान्वयन

आज के डेटा-संचालित युग और दुनिया में पायथन में वास्तविक समय डेटा स्ट्रीमिंग को लागू करना महत्वपूर्ण है। इस विस्तृत उदाहरण में, हम Google Colab में Apache Kafka और Python का उपयोग करके एक वास्तविक समय डेटा स्ट्रीमिंग सिस्टम बनाने की प्रक्रिया से गुजरेंगे।







कोडिंग शुरू करने से पहले उदाहरण आरंभ करने के लिए, Google Colab में एक विशिष्ट वातावरण बनाना आवश्यक है। पहली चीज़ जो हमें करने की ज़रूरत है वह आवश्यक पुस्तकालयों को स्थापित करना है। हम काफ्का एकीकरण के लिए 'काफ्का-पायथन' लाइब्रेरी का उपयोग करते हैं।



! रंज स्थापित करना काफ्का-पायथन


यह कमांड 'काफ्का-पायथन' लाइब्रेरी स्थापित करता है जो अपाचे काफ्का के लिए पायथन फ़ंक्शन और बाइंडिंग प्रदान करता है। इसके बाद, हम अपने प्रोजेक्ट के लिए आवश्यक लाइब्रेरी आयात करते हैं। 'काफ्काप्रोड्यूसर' और 'काफ्काकंस्यूमर' सहित आवश्यक पुस्तकालयों को आयात करना 'काफ्का-पायथन' लाइब्रेरी से कक्षाएं हैं जो हमें काफ्का दलालों के साथ बातचीत करने की अनुमति देती हैं। JSON, JSON डेटा के साथ काम करने के लिए पायथन लाइब्रेरी है जिसका उपयोग हम संदेशों को क्रमबद्ध और डीसेरिएलाइज़ करने के लिए करते हैं।



काफ्का से आयात काफ्काप्रोड्यूसर, काफ्काउपभोक्ता
json आयात करें


काफ्का निर्माता का निर्माण





यह महत्वपूर्ण है क्योंकि काफ्का निर्माता डेटा को काफ्का विषय पर भेजता है। हमारे उदाहरण में, हम 'वास्तविक समय-विषय' नामक विषय पर सिम्युलेटेड वास्तविक समय डेटा भेजने के लिए एक निर्माता बनाते हैं।

हम एक 'काफ्काप्रोड्यूसर' उदाहरण बनाते हैं जो काफ्का ब्रोकर का पता 'लोकलहोस्ट:9092' निर्दिष्ट करता है। फिर, हम 'value_serializer' का उपयोग करते हैं, एक फ़ंक्शन जो काफ्का को भेजने से पहले डेटा को क्रमबद्ध करता है। हमारे मामले में, एक लैम्ब्डा फ़ंक्शन डेटा को UTF-8-एन्कोडेड JSON के रूप में एन्कोड करता है। अब, आइए कुछ वास्तविक समय डेटा का अनुकरण करें और इसे काफ्का विषय पर भेजें।



निर्माता = काफ्कानिर्माता ( बूटस्ट्रैप_सर्वर = 'लोकलहोस्ट:9092' ,
value_serializer =लैम्ब्डा वी: json.dumps ( में ) .एन्कोड ( 'यूटीएफ-8' ) )
# सिम्युलेटेड वास्तविक समय डेटा
डेटा = { 'सेंसर_आईडी' : 1 , 'तापमान' : 25.5 , 'नमी' : 60.2 }
#विषय पर डेटा भेजना
निर्माता.भेजें ( 'वास्तविक समय-विषय' , डेटा )


इन पंक्तियों में, हम एक 'डेटा' शब्दकोश को परिभाषित करते हैं जो एक सिम्युलेटेड सेंसर डेटा का प्रतिनिधित्व करता है। फिर हम इस डेटा को 'वास्तविक समय-विषय' पर प्रकाशित करने के लिए 'भेजें' विधि का उपयोग करते हैं।

फिर, हम एक काफ्का उपभोक्ता बनाना चाहते हैं, और एक काफ्का उपभोक्ता काफ्का विषय से डेटा पढ़ता है। हम 'वास्तविक-समय-विषय' में संदेशों का उपभोग और प्रसंस्करण करने के लिए एक उपभोक्ता बनाते हैं। हम एक 'काफ्का उपभोक्ता' उदाहरण बनाते हैं, उस विषय को निर्दिष्ट करते हैं जिसका हम उपभोग करना चाहते हैं, उदाहरण के लिए, (वास्तविक समय-विषय) और काफ्का ब्रोकर का पता। फिर, 'value_deserializer' एक फ़ंक्शन है जो काफ्का से प्राप्त डेटा को डिसेरिएलाइज़ करता है। हमारे मामले में, एक लैम्ब्डा फ़ंक्शन डेटा को UTF-8-एन्कोडेड JSON के रूप में डीकोड करता है।

उपभोक्ता = काफ्काउपभोक्ता ( 'वास्तविक समय-विषय' ,
बूटस्ट्रैप_सर्वर = 'लोकलहोस्ट:9092' ,
value_deserializer =लैम्ब्डा x: json.loads ( x.डिकोड ( 'यूटीएफ-8' ) ) )


हम विषय से संदेशों को लगातार उपभोग और संसाधित करने के लिए एक पुनरावृत्त लूप का उपयोग करते हैं।

# वास्तविक समय डेटा को पढ़ना और संसाधित करना
के लिए संदेश में उपभोक्ता:
डेटा = संदेश.मूल्य
छपाई ( एफ 'प्राप्त डेटा: {डेटा}' )


हम लूप के अंदर प्रत्येक संदेश का मूल्य और हमारे सिम्युलेटेड सेंसर डेटा को पुनः प्राप्त करते हैं और इसे कंसोल पर प्रिंट करते हैं। काफ्का निर्माता और उपभोक्ता को चलाने में इस कोड को Google Colab में चलाना और कोड कोशिकाओं को व्यक्तिगत रूप से निष्पादित करना शामिल है। निर्माता सिम्युलेटेड डेटा को काफ्का विषय पर भेजता है, और उपभोक्ता प्राप्त डेटा को पढ़ता है और प्रिंट करता है।


कोड रन के रूप में आउटपुट का विश्लेषण

हम वास्तविक समय के उस डेटा का निरीक्षण करेंगे जिसका उत्पादन और उपभोग किया जा रहा है। डेटा प्रारूप हमारे सिमुलेशन या वास्तविक डेटा स्रोत के आधार पर भिन्न हो सकता है। इस विस्तृत उदाहरण में, हम Google Colab में Apache Kafka और Python का उपयोग करके वास्तविक समय डेटा स्ट्रीमिंग सिस्टम स्थापित करने की पूरी प्रक्रिया को कवर करते हैं। हम इस प्रणाली के निर्माण में कोड की प्रत्येक पंक्ति और उसके महत्व को समझाएंगे। वास्तविक समय डेटा स्ट्रीमिंग एक शक्तिशाली क्षमता है, और यह उदाहरण अधिक जटिल वास्तविक दुनिया अनुप्रयोगों के लिए आधार के रूप में कार्य करता है।

उदाहरण 2: स्टॉक मार्केट डेटा का उपयोग करके पायथन में रीयल-टाइम डेटा स्ट्रीमिंग लागू करना

आइए एक अलग परिदृश्य का उपयोग करके पायथन में वास्तविक समय डेटा स्ट्रीमिंग को लागू करने का एक और अनूठा उदाहरण देखें; इस बार हम स्टॉक मार्केट डेटा पर फोकस करेंगे। हम एक वास्तविक समय डेटा स्ट्रीमिंग सिस्टम बनाते हैं जो स्टॉक मूल्य परिवर्तनों को कैप्चर करता है और Google Colab में Apache Kafka और Python का उपयोग करके उन्हें संसाधित करता है। जैसा कि पिछले उदाहरण में दिखाया गया है, हम Google Colab में अपने परिवेश को कॉन्फ़िगर करके प्रारंभ करते हैं। सबसे पहले, हम आवश्यक लाइब्रेरी स्थापित करते हैं:

! रंज स्थापित करना काफ्का-पायथन वाईफाइनेंस


यहां, हम 'yfinance' लाइब्रेरी जोड़ते हैं जो हमें वास्तविक समय के शेयर बाजार डेटा प्राप्त करने की अनुमति देता है। अगला, हम आवश्यक पुस्तकालय आयात करते हैं। हम काफ्का इंटरैक्शन के लिए 'काफ्का-पायथन' लाइब्रेरी से 'काफ्काप्रोड्यूसर' और 'काफ्काकंज्यूमर' कक्षाओं का उपयोग करना जारी रखते हैं। हम JSON डेटा के साथ काम करने के लिए JSON आयात करते हैं। हम वास्तविक समय के शेयर बाजार डेटा प्राप्त करने के लिए 'yfinance' का भी उपयोग करते हैं। हम वास्तविक समय के अपडेट को अनुकरण करने के लिए समय विलंब जोड़ने के लिए 'समय' लाइब्रेरी भी आयात करते हैं।

काफ्का से आयात काफ्काप्रोड्यूसर, काफ्काउपभोक्ता
json आयात करें
yfinance आयात करें जैसा yf
आयात समय


अब, हम स्टॉक डेटा के लिए एक काफ्का निर्माता बनाते हैं। हमारा काफ्का निर्माता वास्तविक समय का स्टॉक डेटा प्राप्त करता है और इसे 'स्टॉक-प्राइस' नामक काफ्का विषय पर भेजता है।

निर्माता = काफ्कानिर्माता ( बूटस्ट्रैप_सर्वर = 'लोकलहोस्ट:9092' ,
value_serializer =लैम्ब्डा वी: json.dumps ( में ) .एन्कोड ( 'यूटीएफ-8' ) )

जबकि सत्य:
स्टॉक = yf.टिकर ( 'एएपीएल' ) # उदाहरण: एप्पल इंक. स्टॉक
स्टॉक_डेटा = स्टॉक.इतिहास ( अवधि = '1डी' )
अंतिम कीमत = स्टॉक_डेटा [ 'बंद करना' ] .iloc [ - 1 ]
डेटा = { 'प्रतीक' : 'एएपीएल' , 'कीमत' : अंतिम मूल्य }
निर्माता.भेजें ( 'शेयर की कीमत' , डेटा )
समय पर सोये ( 10 ) # हर 10 सेकंड में वास्तविक समय अपडेट का अनुकरण करें


हम इस कोड में काफ्का ब्रोकर के पते के साथ एक 'काफ्काप्रोड्यूसर' उदाहरण बनाते हैं। लूप के अंदर, हम Apple Inc. ('AAPL') के लिए नवीनतम स्टॉक मूल्य प्राप्त करने के लिए 'yfinance' का उपयोग करते हैं। फिर, हम अंतिम समापन मूल्य निकालते हैं और इसे 'स्टॉक-मूल्य' विषय पर भेजते हैं। अंततः, हम हर 10 सेकंड में वास्तविक समय के अपडेट को अनुकरण करने के लिए एक समय विलंब की शुरुआत करते हैं।

आइए 'स्टॉक-प्राइस' विषय से स्टॉक मूल्य डेटा को पढ़ने और संसाधित करने के लिए एक काफ्का उपभोक्ता बनाएं।

उपभोक्ता = काफ्काउपभोक्ता ( 'शेयर की कीमत' ,
बूटस्ट्रैप_सर्वर = 'लोकलहोस्ट:9092' ,
value_deserializer =लैम्ब्डा x: json.loads ( x.डिकोड ( 'यूटीएफ-8' ) ) )

के लिए संदेश में उपभोक्ता:
स्टॉक_डेटा = संदेश.मूल्य
छपाई ( एफ 'प्राप्त स्टॉक डेटा: {स्टॉक_डेटा['प्रतीक']} - मूल्य: {स्टॉक_डेटा['मूल्य']}' )


यह कोड पिछले उदाहरण के उपभोक्ता सेटअप के समान है। यह 'स्टॉक-मूल्य' विषय से संदेशों को लगातार पढ़ता और संसाधित करता है और स्टॉक प्रतीक और मूल्य को कंसोल पर प्रिंट करता है। हम कोड सेल को क्रमिक रूप से निष्पादित करते हैं, उदाहरण के लिए, निर्माता और उपभोक्ता को चलाने के लिए Google Colab में एक-एक करके। निर्माता वास्तविक समय स्टॉक मूल्य अपडेट प्राप्त करता है और भेजता है जबकि उपभोक्ता इस डेटा को पढ़ता और प्रदर्शित करता है।

! रंज स्थापित करना काफ्का-पायथन वाईफाइनेंस
काफ्का से आयात काफ्काप्रोड्यूसर, काफ्काउपभोक्ता
json आयात करें
yfinance आयात करें जैसा yf
आयात समय
निर्माता = काफ्कानिर्माता ( बूटस्ट्रैप_सर्वर = 'लोकलहोस्ट:9092' ,
value_serializer =लैम्ब्डा वी: json.dumps ( में ) .एन्कोड ( 'यूटीएफ-8' ) )

जबकि सत्य:
स्टॉक = yf.टिकर ( 'एएपीएल' ) # एप्पल इंक. स्टॉक
स्टॉक_डेटा = स्टॉक.इतिहास ( अवधि = '1डी' )
अंतिम कीमत = स्टॉक_डेटा [ 'बंद करना' ] .iloc [ - 1 ]

डेटा = { 'प्रतीक' : 'एएपीएल' , 'कीमत' : अंतिम मूल्य }

निर्माता.भेजें ( 'शेयर की कीमत' , डेटा )

समय पर सोये ( 10 ) # हर 10 सेकंड में वास्तविक समय अपडेट का अनुकरण करें
उपभोक्ता = काफ्काउपभोक्ता ( 'शेयर की कीमत' ,
बूटस्ट्रैप_सर्वर = 'लोकलहोस्ट:9092' ,
value_deserializer =लैम्ब्डा x: json.loads ( x.डिकोड ( 'यूटीएफ-8' ) ) )

के लिए संदेश में उपभोक्ता:
स्टॉक_डेटा = संदेश.मूल्य
छपाई ( एफ 'प्राप्त स्टॉक डेटा: {स्टॉक_डेटा['प्रतीक']} - मूल्य: {स्टॉक_डेटा['मूल्य']}' )


कोड चलने के बाद आउटपुट के विश्लेषण में, हम ऐप्पल इंक के उत्पादन और उपभोग के वास्तविक समय के स्टॉक मूल्य अपडेट का निरीक्षण करेंगे।

निष्कर्ष

इस अनूठे उदाहरण में, हमने स्टॉक मार्केट डेटा को कैप्चर करने और संसाधित करने के लिए अपाचे काफ्का और 'yfinance' लाइब्रेरी का उपयोग करके पायथन में वास्तविक समय डेटा स्ट्रीमिंग के कार्यान्वयन का प्रदर्शन किया। हमने कोड की प्रत्येक पंक्ति को अच्छी तरह समझाया। वित्त, IoT और अन्य में वास्तविक दुनिया के अनुप्रयोगों के निर्माण के लिए वास्तविक समय डेटा स्ट्रीमिंग को विभिन्न क्षेत्रों में लागू किया जा सकता है।