पायस्पार्क पांडा_यूडीएफ ()

Payasparka Panda Yudi Epha



pandas_udf() फ़ंक्शन का उपयोग करके PySpark DataFrame को बदलना संभव है। यह एक उपयोगकर्ता परिभाषित फ़ंक्शन है जो तीर के साथ PySpark DataFrame पर लागू होता है। हम pandas_udf () का उपयोग करके सदिश संचालन कर सकते हैं। इस फ़ंक्शन को डेकोरेटर के रूप में पास करके इसे लागू किया जा सकता है। सिंटैक्स, पैरामीटर और विभिन्न उदाहरणों को जानने के लिए आइए इस गाइड में गोता लगाएँ।

सामग्री का विषय:

यदि आप PySpark DataFrame और मॉड्यूल इंस्टॉलेशन के बारे में जानना चाहते हैं, तो इसे देखें लेख .







Pyspark.sql.functions.pandas_udf()

pandas_udf () PySpark में sql.functions मॉड्यूल में उपलब्ध है जिसे 'से' कीवर्ड का उपयोग करके आयात किया जा सकता है। इसका उपयोग हमारे PySpark DataFrame पर वेक्टरकृत संचालन करने के लिए किया जाता है। यह फ़ंक्शन तीन पैरामीटर पास करके डेकोरेटर की तरह कार्यान्वित किया जाता है। उसके बाद, हम एक उपयोगकर्ता-परिभाषित फ़ंक्शन बना सकते हैं जो एक तीर का उपयोग करके वेक्टर प्रारूप में डेटा लौटाता है (जैसे हम इसके लिए श्रृंखला/NumPy का उपयोग करते हैं)। इस फ़ंक्शन के भीतर, हम परिणाम वापस करने में सक्षम हैं।



संरचना और वाक्य-विन्यास:



सबसे पहले, आइए इस फ़ंक्शन की संरचना और सिंटैक्स देखें:

@pandas_udf(डेटाटाइप)
def function_name (ऑपरेशन) -> Convert_format:
वापसी कथन

यहाँ, function_name हमारे परिभाषित फ़ंक्शन का नाम है। डेटा प्रकार उस डेटा प्रकार को निर्दिष्ट करता है जो इस फ़ंक्शन द्वारा लौटाया जाता है। हम “रिटर्न” कीवर्ड का उपयोग करके परिणाम वापस कर सकते हैं। सभी ऑपरेशन फंक्शन के अंदर एरो असाइनमेंट के साथ किए जाते हैं।





पांडा_उदफ (फ़ंक्शन और रिटर्न टाइप)

  1. पहला पैरामीटर उपयोगकर्ता-परिभाषित फ़ंक्शन है जो इसे पास किया गया है।
  2. दूसरे पैरामीटर का उपयोग फ़ंक्शन से रिटर्न डेटा प्रकार निर्दिष्ट करने के लिए किया जाता है।

आंकड़े:

इस संपूर्ण गाइड में, हम प्रदर्शन के लिए केवल एक PySpark DataFrame का उपयोग करते हैं। हमारे द्वारा परिभाषित सभी उपयोगकर्ता परिभाषित कार्य इस PySpark DataFrame पर लागू होते हैं। सुनिश्चित करें कि आप इस DataFrame को PySpark की स्थापना के बाद पहले अपने वातावरण में बनाते हैं।



pyspark आयात करें

pyspark.sql से SparkSession आयात करें

linuxhint_spark_app = SparkSession.builder.appName ( 'लिनक्स संकेत' .getOrCreate()

pyspark.sql.functions से pandas_udf आयात करें

pyspark.sql.types आयात से *

पांडा को पांडा के रूप में आयात करें

# सब्जी का विवरण

सब्जी = [{ 'प्रकार' : 'सब्ज़ी' , 'नाम' : 'टमाटर' , 'पता लगाएँ_देश' : 'अमेरीका' , 'मात्रा' : 800 },

{ 'प्रकार' : 'फल' , 'नाम' : 'केला' , 'पता लगाएँ_देश' : 'चीन' , 'मात्रा' : बीस },

{ 'प्रकार' : 'सब्ज़ी' , 'नाम' : 'टमाटर' , 'पता लगाएँ_देश' : 'अमेरीका' , 'मात्रा' : 800 },

{ 'प्रकार' : 'सब्ज़ी' , 'नाम' : 'आम' , 'पता लगाएँ_देश' : 'जापान' , 'मात्रा' : 0 },

{ 'प्रकार' : 'फल' , 'नाम' : 'नींबू' , 'पता लगाएँ_देश' : 'भारत' , 'मात्रा' : 1700 },

{ 'प्रकार' : 'सब्ज़ी' , 'नाम' : 'टमाटर' , 'पता लगाएँ_देश' : 'अमेरीका' , 'मात्रा' : 1200 },

{ 'प्रकार' : 'सब्ज़ी' , 'नाम' : 'आम' , 'पता लगाएँ_देश' : 'जापान' , 'मात्रा' : 0 },

{ 'प्रकार' : 'फल' , 'नाम' : 'नींबू' , 'पता लगाएँ_देश' : 'भारत' , 'मात्रा' : 0 }

]

# उपरोक्त डेटा से मार्केट डेटाफ़्रेम बनाएं

Market_df = linuxhint_spark_app.createDataFrame (सब्जी)

मार्केट_डीएफ.शो ()

आउटपुट:

यहाँ, हम इस DataFrame को 4 कॉलम और 8 पंक्तियों के साथ बनाते हैं। अब, हम उपयोगकर्ता परिभाषित कार्यों को बनाने और उन्हें इन स्तंभों पर लागू करने के लिए pandas_udf() का उपयोग करते हैं।

pandas_udf() विभिन्न डेटा प्रकारों के साथ

इस परिदृश्य में, हम pandas_udf() के साथ कुछ यूज़र-डिफ़ाइंड फ़ंक्शन बनाते हैं और उन्हें कॉलम पर लागू करते हैं और चयन() विधि का उपयोग करके परिणाम प्रदर्शित करते हैं। प्रत्येक मामले में, हम pandas.Series का उपयोग करते हैं क्योंकि हम सदिश संचालन करते हैं। यह कॉलम मानों को एक-आयामी सरणी के रूप में मानता है और ऑपरेशन कॉलम पर लागू होता है। डेकोरेटर में ही, हम फंक्शन रिटर्न टाइप निर्दिष्ट करते हैं।

उदाहरण 1: पांडास_उदफ () स्ट्रिंग प्रकार के साथ

यहां, हम स्ट्रिंग प्रकार के कॉलम मानों को अपरकेस और लोअरकेस में बदलने के लिए स्ट्रिंग रिटर्न प्रकार के साथ दो उपयोगकर्ता-परिभाषित फ़ंक्शन बनाते हैं। अंत में, हम इन कार्यों को 'प्रकार' और 'पता_देश' कॉलम पर लागू करते हैं।

# pandas_udf के साथ टाइप कॉलम को अपर केस में बदलें

@pandas_udf(स्ट्रिंगटाइप ())

def type_upper_case (i: panda.Series) -> panda.Series:

वापसी i.str.upper ()

# पांडा_यूडीएफ के साथ लोकेट_कंट्री कॉलम को लोअरकेस में बदलें

@pandas_udf(स्ट्रिंगटाइप ())

डीफ़ कंट्री_लोअर_केस (i: panda.Series) -> panda.Series:

वापसी i.str.lower ()

# चयन का उपयोग करके कॉलम प्रदर्शित करें ()

बाजार_डीएफ.चयन करें ( 'प्रकार' , type_upper_case ( 'प्रकार' ), 'पता लगाएँ_देश' ,
देश_लोअर_केस ( 'पता लगाएँ_देश' ))।दिखाना()

आउटपुट:

व्याख्या:

StringType() फ़ंक्शन pyspark.sql.types मॉड्यूल में उपलब्ध है। PySpark DataFrame बनाते समय हमने पहले ही इस मॉड्यूल को इम्पोर्ट कर लिया था।

  1. सबसे पहले, UDF (यूज़र-डिफ़ाइंड फ़ंक्शन) str.upper() फ़ंक्शन का उपयोग करके स्ट्रिंग्स को अपरकेस में लौटाता है। str.upper() श्रृंखला डेटा संरचना में उपलब्ध है (जैसा कि हम फ़ंक्शन के अंदर एक तीर के साथ श्रृंखला में परिवर्तित कर रहे हैं) जो दिए गए स्ट्रिंग को अपरकेस में परिवर्तित करता है। अंत में, यह फ़ंक्शन 'टाइप' कॉलम पर लागू होता है जो चयन () विधि के अंदर निर्दिष्ट होता है। पहले, टाइप कॉलम में सभी स्ट्रिंग्स लोअरकेस में होती हैं। अब, उन्हें अपरकेस में बदल दिया गया है।
  2. दूसरा, UDF str.lower() फ़ंक्शन का उपयोग करके स्ट्रिंग्स को अपरकेस में लौटाता है। str.lower() सीरीज डेटा स्ट्रक्चर में उपलब्ध है जो दिए गए स्ट्रिंग को लोअरकेस में कनवर्ट करता है। अंत में, यह फ़ंक्शन 'टाइप' कॉलम पर लागू होता है जो चयन () विधि के अंदर निर्दिष्ट होता है। पहले, प्रकार स्तंभ में सभी स्ट्रिंग अपरकेस में होती हैं. अब, उन्हें लोअरकेस में बदल दिया गया है।

उदाहरण 2: पांडास_उदफ () पूर्णांक प्रकार के साथ

चलिए एक UDF बनाते हैं जो PySpark DataFrame पूर्णांक कॉलम को पांडा श्रृंखला में परिवर्तित करता है और प्रत्येक मान में 100 जोड़ता है। इस फ़ंक्शन को चयन () विधि के अंदर 'मात्रा' कॉलम पास करें।

# 100 जोड़ें

@pandas_udf(इंटीजर टाइप ())

def add_100(i: panda.Series) -> panda.Series:

आई + लौटाओ 100

# उपरोक्त फ़ंक्शन के लिए मात्रा कॉलम पास करें और प्रदर्शित करें।

बाजार_डीएफ.चयन करें ( 'मात्रा' ,जोड़ें_100( 'मात्रा' ))।दिखाना()

आउटपुट:

व्याख्या:

यूडीएफ के अंदर, हम सभी मूल्यों को पुनरावृत्त करते हैं और उन्हें श्रृंखला में परिवर्तित करते हैं। उसके बाद, हम श्रृंखला में प्रत्येक मान में 100 जोड़ते हैं। अंत में, हम इस फ़ंक्शन को 'मात्रा' कॉलम पास करते हैं और हम देख सकते हैं कि 100 सभी मानों में जोड़ा गया है।

पांडास_यूडीएफ () ग्रुपबी () और एजीजी () का उपयोग करके विभिन्न डेटा प्रकारों के साथ

आइए यूडीएफ को एकत्रित कॉलम में पास करने के उदाहरण देखें। यहां, कॉलम मानों को पहले ग्रुपबी () फ़ंक्शन का उपयोग करके समूहीकृत किया जाता है और एग () फ़ंक्शन का उपयोग करके एकत्रीकरण किया जाता है। हम अपने UDF को इस समग्र कार्य के अंदर पास करते हैं।

वाक्य - विन्यास:

pyspark_dataframe_object.groupby( 'ग्रुपिंग_कॉलम' .एजीजी (यूडीएफ
(pyspark_dataframe_object [ 'कॉलम' ]))

यहां, ग्रुपिंग कॉलम में मानों को पहले समूहीकृत किया जाता है। फिर, हमारे यूडीएफ के संबंध में प्रत्येक समूहीकृत डेटा पर एकत्रीकरण किया जाता है।

उदाहरण 1: कुल माध्य के साथ पांडास_उदफ ()

यहां, हम रिटर्न टाइप फ्लोट के साथ यूजर-डिफ़ाइंड फ़ंक्शन बनाते हैं। फ़ंक्शन के अंदर, हम माध्य () फ़ंक्शन का उपयोग करके औसत की गणना करते हैं। प्रत्येक प्रकार के लिए औसत मात्रा प्राप्त करने के लिए यह यूडीएफ 'मात्रा' कॉलम में पास किया जाता है।

# माध्य / औसत लौटाएँ

@pandas_udf( 'तैरना' )

def औसत_फंक्शन (i: panda.Series) -> फ्लोट:

वापसी i.mean ()

# टाइप कॉलम को ग्रुप करके फंक्शन में क्वांटिटी कॉलम पास करें।

मार्केट_डीएफ.ग्रुपबी ( 'प्रकार' .एजीजी (औसत_फंक्शन (बाजार_डीएफ) 'मात्रा' ]))।दिखाना()

आउटपुट:

हम 'प्रकार' कॉलम में तत्वों के आधार पर समूहीकरण कर रहे हैं। दो समूह बनते हैं - 'फल' और 'सब्जी'। प्रत्येक समूह के लिए, माध्य की गणना की जाती है और लौटाया जाता है।

उदाहरण 2: कुल अधिकतम () और न्यूनतम () के साथ पांडा_उदफ ()

यहां, हम पूर्णांक (int) रिटर्न प्रकार के साथ दो यूज़र-डिफ़ाइंड फ़ंक्शन बनाते हैं। पहला यूडीएफ न्यूनतम मूल्य लौटाता है और दूसरा यूडीएफ अधिकतम मूल्य लौटाता है।

# pandas_udf जो न्यूनतम मान लौटाता है

@pandas_udf( 'इंट' )

def min_(i: panda.Series) -> int:

वापसी i.min ()

# pandas_udf जो अधिकतम मूल्य लौटाता है

@pandas_udf( 'इंट' )

def max_(i: panda.Series) -> int:

वापसी i.max ()

# Find_country को समूहीकृत करके min_ pandas_udf को मात्रा कॉलम पास करें।

मार्केट_डीएफ.ग्रुपबी ( 'पता लगाएँ_देश' .एजीजी(न्यूनतम_(मार्केट_डीएफ[ 'मात्रा' ]))।दिखाना()

# Find_country को समूहीकृत करके मात्रा कॉलम को max_ pandas_udf पर पास करें।

मार्केट_डीएफ.ग्रुपबी ( 'पता लगाएँ_देश' .एजीजी (अधिकतम_ (बाजार_डीएफ [ 'मात्रा' ]))।दिखाना()

आउटपुट:

न्यूनतम और अधिकतम मान वापस करने के लिए, हम यूडीएफ के रिटर्न प्रकार में न्यूनतम () और अधिकतम () फ़ंक्शन का उपयोग करते हैं। अब, हम डेटा को 'locate_country' कॉलम में समूहित करते हैं। चार समूह बनते हैं ('चीन', 'भारत', 'जापान', 'यूएसए')। प्रत्येक समूह के लिए, हम अधिकतम मात्रा लौटाते हैं। इसी तरह, हम न्यूनतम मात्रा लौटाते हैं।

निष्कर्ष

मूल रूप से, pandas_udf () का उपयोग हमारे PySpark DataFrame पर सदिश संचालन करने के लिए किया जाता है। हमने देखा है कि pandas_udf() कैसे बनाएं और इसे PySpark DataFrame पर कैसे लागू करें। बेहतर समझ के लिए, हमने सभी डेटाटाइप्स (स्ट्रिंग, फ्लोट और इंटीजर) पर विचार करके विभिन्न उदाहरणों पर चर्चा की। agg() फ़ंक्शन के माध्यम से groupby() के साथ pandas_udf() का उपयोग करना संभव हो सकता है।