PySpark में तालिका डेटा कैसे पढ़ें और लिखें

Pyspark Mem Talika Deta Kaise Parhem Aura Likhem



यदि डेटा को तालिका के रूप में लोड किया जाता है, तो PySpark में डेटा प्रोसेसिंग तेज़ हो जाती है। इससे SQl Expressions के इस्तेमाल से प्रोसेसिंग जल्दी होगी। इसलिए, प्रसंस्करण के लिए भेजने से पहले PySpark DataFrame/RDD को तालिका में परिवर्तित करना बेहतर तरीका है। आज, हम देखेंगे कि कैसे PySpark DataFrame में तालिका डेटा को पढ़ना है, PySpark DataFrame को तालिका में लिखना है, और अंतर्निहित कार्यों का उपयोग करके मौजूदा तालिका में नया DataFrame सम्मिलित करना है। चल दर!

Pyspark.sql.DataFrameWriter.saveAsTable ()

सबसे पहले, हम देखेंगे कि कैसे मौजूदा PySpark DataFrame को तालिका में लिखने के लिए write.saveAsTable () फ़ंक्शन का उपयोग करना है। तालिका में डेटाफ़्रेम लिखने के लिए यह तालिका का नाम और अन्य वैकल्पिक पैरामीटर जैसे मोड, पार्टिशनबी, आदि लेता है। इसे लकड़ी की फाइल के रूप में संग्रहित किया जाता है।

वाक्य - विन्यास:







dataframe_obj.write.saveAsTable (पथ/तालिका_नाम, मोड, विभाजन द्वारा, ...)
  1. Table_name उस तालिका का नाम है जो dataframe_obj से बनाई गई है।
  2. हम मोड पैरामीटर का उपयोग करके तालिका के डेटा को जोड़/ओवरराइट कर सकते हैं।
  3. पार्टीशनबी इन प्रदान किए गए कॉलम में मानों के आधार पर विभाजन बनाने के लिए एकल/एकाधिक कॉलम लेता है।

उदाहरण 1:

5 पंक्तियों और 4 स्तंभों के साथ एक PySpark DataFrame बनाएँ। इस डेटाफ़्रेम को 'Agri_Table1' नामक तालिका में लिखें।



pyspark आयात करें

pyspark.sql से SparkSession आयात करें

linuxhint_spark_app = SparkSession.builder.appName ( 'लिनक्स संकेत' .getOrCreate()

# खेती डेटा 5 पंक्तियों और 5 कॉलम के साथ

कृषि = [{ 'मिट्टी के प्रकार' : 'काला' , 'सिंचाई_उपलब्धता' : 'नहीं' , 'एकड़' : 2500 , 'मृदा_स्थिति' : 'सूखा' ,
'देश' : 'अमेरीका' },

{ 'मिट्टी के प्रकार' : 'काला' , 'सिंचाई_उपलब्धता' : 'हाँ' , 'एकड़' : 3500 , 'मृदा_स्थिति' : 'गीला' ,
'देश' : 'भारत' },

{ 'मिट्टी के प्रकार' : 'लाल' , 'सिंचाई_उपलब्धता' : 'हाँ' , 'एकड़' : 210 , 'मृदा_स्थिति' : 'सूखा' ,
'देश' : 'यूके' },

{ 'मिट्टी के प्रकार' : 'अन्य' , 'सिंचाई_उपलब्धता' : 'नहीं' , 'एकड़' : 1000 , 'मृदा_स्थिति' : 'गीला' ,
'देश' : 'अमेरीका' },

{ 'मिट्टी के प्रकार' : 'रेत' , 'सिंचाई_उपलब्धता' : 'नहीं' , 'एकड़' : 500 , 'मृदा_स्थिति' : 'सूखा' ,
'देश' : 'भारत' }]



# उपरोक्त डेटा से डेटाफ़्रेम बनाएं

agri_df = linuxhint_spark_app.createDataFrame (कृषि)

agri_df.शो ()

# उपरोक्त डेटाफ़्रेम को तालिका में लिखें।

agri_df.coalesce ( 1 .write.saveAsTable ( 'एग्री_टेबल1' )

आउटपुट:







हम देख सकते हैं कि पिछले PySpark डेटा के साथ एक लकड़ी की छत फ़ाइल बनाई गई है।



उदाहरण 2:

पिछले DataFrame पर विचार करें और 'देश' कॉलम में मानों के आधार पर रिकॉर्ड को विभाजित करके तालिका में 'Agri_Table2' लिखें।

# ऊपर दिए गए DataFrame को विभाजन के साथ तालिका में लिखें

agri_df.write.saveAsTable ( 'एग्री_टेबल2' विभाजन द्वारा = [ 'देश' ])

आउटपुट:

'देश' कॉलम में तीन अद्वितीय मान हैं - 'भारत', 'यूके', और 'यूएसए'। तो, तीन विभाजन बनते हैं। प्रत्येक विभाजन लकड़ी की छत फ़ाइलें रखती है।

Pyspark.sql.DataFrameReader.table ()

स्पार्क.रीड.टेबल () फ़ंक्शन का उपयोग करके तालिका को PySpark DataFrame में लोड करते हैं। यह केवल एक पैरामीटर लेता है जो पथ/तालिका का नाम है। यह सीधे तालिका को PySpark DataFrame में लोड करता है और PySpark DataFrame पर लागू होने वाले सभी SQL फ़ंक्शंस को इस लोड किए गए DataFrame पर भी लागू किया जा सकता है।

वाक्य - विन्यास:

spark_app.read.table(पथ/'Table_name')

इस परिदृश्य में, हम पिछली तालिका का उपयोग करते हैं जो कि PySpark DataFrame से बनाई गई थी। सुनिश्चित करें कि आपको अपने परिवेश में पिछले परिदृश्य कोड स्निपेट्स को लागू करने की आवश्यकता है।

उदाहरण:

'Agri_Table1' टेबल को 'loaded_data' नाम के DataFrame में लोड करें।

लोड_डेटा = linuxhint_spark_app.read.table ( 'एग्री_टेबल1' )

लोड_डाटा.शो ()

आउटपुट:

हम देख सकते हैं कि टेबल को PySpark DataFrame में लोड किया गया है।

SQL क्वेरी को निष्पादित करना

अब, हम स्पार्क.एसक्यूएल () फ़ंक्शन का उपयोग करके लोड किए गए डेटाफ़्रेम पर कुछ एसक्यूएल प्रश्नों को निष्पादित करते हैं।

# उपरोक्त तालिका से सभी कॉलम प्रदर्शित करने के लिए SELECT कमांड का उपयोग करें।

linuxhint_spark_app.sql( 'Agri_Table1 से चुनें *' )।दिखाना()

# कहां कारण

linuxhint_spark_app.sql( 'Agri_Table1 से चुनें * जहां मिट्टी_स्थिति = 'सूखा'' )।दिखाना()

linuxhint_spark_app.sql( 'Agri_Table1 से चुनें* जहां एकर्स > 2000' )।दिखाना()

आउटपुट:

  1. पहली क्वेरी डेटाफ़्रेम से सभी कॉलम और रिकॉर्ड प्रदर्शित करती है।
  2. दूसरी क्वेरी 'Soil_status' कॉलम के आधार पर रिकॉर्ड प्रदर्शित करती है। 'ड्राई' तत्व के साथ केवल तीन रिकॉर्ड हैं।
  3. अंतिम क्वेरी 'एकड़' के साथ दो रिकॉर्ड लौटाती है जो 2000 से अधिक हैं।

Pyspark.sql.DataFrameWriter.insertInto ()

insertInto () फ़ंक्शन का उपयोग करके, हम DataFrame को मौजूदा तालिका में जोड़ सकते हैं। हम इस फ़ंक्शन का उपयोग selectExpr() के साथ कॉलम नामों को परिभाषित करने के लिए कर सकते हैं और फिर इसे तालिका में सम्मिलित कर सकते हैं। यह फ़ंक्शन तालिका नाम को पैरामीटर के रूप में भी लेता है।

वाक्य - विन्यास:

DataFrame_obj.write.insertInto('Table_name')

इस परिदृश्य में, हम पिछली तालिका का उपयोग करते हैं जो कि PySpark DataFrame से बनाई गई थी। सुनिश्चित करें कि आपको अपने परिवेश में पिछले परिदृश्य कोड स्निपेट्स को लागू करने की आवश्यकता है।

उदाहरण:

दो रिकॉर्ड के साथ एक नया डेटाफ़्रेम बनाएँ और उन्हें 'Agri_Table1' तालिका में डालें।

pyspark आयात करें

pyspark.sql से SparkSession आयात करें

linuxhint_spark_app = SparkSession.builder.appName ( 'लिनक्स संकेत' .getOrCreate()

# खेती डेटा 2 पंक्तियों के साथ

कृषि = [{ 'मिट्टी के प्रकार' : 'रेत' , 'सिंचाई_उपलब्धता' : 'नहीं' , 'एकड़' : 2500 , 'मृदा_स्थिति' : 'सूखा' ,
'देश' : 'अमेरीका' },

{ 'मिट्टी के प्रकार' : 'रेत' , 'सिंचाई_उपलब्धता' : 'नहीं' , 'एकड़' : 1200 , 'मृदा_स्थिति' : 'गीला' ,
'देश' : 'जापान' }]

# उपरोक्त डेटा से डेटाफ़्रेम बनाएं

agri_df2 = linuxhint_spark_app.createDataFrame (कृषि)

agri_df2.शो ()

# लिखें.insertInto ()

agri_df2.selectExpr ( 'एकड़' , 'देश' , 'सिंचाई_उपलब्धता' , 'मिट्टी के प्रकार' ,
'मृदा_स्थिति' .write.insertInto( 'एग्री_टेबल1' )

# अंतिम एग्री_टेबल1 प्रदर्शित करें

linuxhint_spark_app.sql( 'Agri_Table1 से चुनें*' )।दिखाना()

आउटपुट:

अब, DataFrame में मौजूद पंक्तियों की कुल संख्या 7 है।

निष्कर्ष

अब आप समझ गए हैं कि तालिका में PySpark DataFrame को write.saveAsTable () फ़ंक्शन का उपयोग करके कैसे लिखना है। यह तालिका का नाम और अन्य वैकल्पिक पैरामीटर लेता है। फिर, हमने इस तालिका को स्पार्क.रीड.टेबल () फ़ंक्शन का उपयोग करके PySpark DataFrame में लोड किया। यह केवल एक पैरामीटर लेता है जो पथ/तालिका का नाम है। यदि आप नए DataFrame को मौजूदा तालिका में जोड़ना चाहते हैं, तो insertInto() फ़ंक्शन का उपयोग करें।