Πώς να διαβάσετε και να γράψετε δεδομένα πίνακα στο PySpark

Pos Na Diabasete Kai Na Grapsete Dedomena Pinaka Sto Pyspark



Η επεξεργασία δεδομένων στο PySpark είναι ταχύτερη εάν τα δεδομένα φορτωθούν με τη μορφή πίνακα. Με αυτό, χρησιμοποιώντας τις εκφράσεις SQl, η επεξεργασία θα είναι γρήγορη. Έτσι, η μετατροπή του PySpark DataFrame/RDD σε πίνακα πριν από την αποστολή του για επεξεργασία είναι η καλύτερη προσέγγιση. Σήμερα, θα δούμε πώς να διαβάζουμε τα δεδομένα πίνακα στο PySpark DataFrame, να γράφουμε το PySpark DataFrame στον πίνακα και να εισάγουμε νέο DataFrame στον υπάρχοντα πίνακα χρησιμοποιώντας τις ενσωματωμένες συναρτήσεις. Πάμε!

Pyspark.sql.DataFrameWriter.saveAsTable()

Αρχικά, θα δούμε πώς να γράψουμε το υπάρχον PySpark DataFrame στον πίνακα χρησιμοποιώντας τη συνάρτηση write.saveAsTable(). Χρειάζεται το όνομα του πίνακα και άλλες προαιρετικές παραμέτρους όπως modes, partionBy κ.λπ., για να γράψει το DataFrame στον πίνακα. Αποθηκεύεται ως αρχείο παρκέ.

Σύνταξη:







dataframe_obj.write.saveAsTable(διαδρομή/όνομα_πίνακα, λειτουργία, partitionBy,…)
  1. Το Table_name είναι το όνομα του πίνακα που δημιουργείται από το dataframe_obj.
  2. Μπορούμε να προσαρτήσουμε/αντικαταστήσουμε τα δεδομένα του πίνακα χρησιμοποιώντας την παράμετρο mode.
  3. Το partitionBy παίρνει τις μονή/πολλαπλές στήλες για να δημιουργήσει διαμερίσματα με βάση τις τιμές σε αυτές τις παρεχόμενες στήλες.

Παράδειγμα 1:

Δημιουργήστε ένα PySpark DataFrame με 5 σειρές και 4 στήλες. Γράψτε αυτό το Dataframe σε έναν πίνακα που ονομάζεται 'Agri_Table1'.



εισαγωγή pyspark

από το pyspark.sql εισαγωγή SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

# γεωργικά δεδομένα με 5 σειρές και 5 στήλες

agri =[{ 'Τύπος_Εδάφους' : 'Μαύρος' , 'Irrigation_availability' : 'Οχι' , 'στρέμματα' : 2500 , 'Κατάσταση_εδάφους' : 'Ξηρός' ,
'Χώρα' : 'ΗΠΑ' },

{ 'Τύπος_Εδάφους' : 'Μαύρος' , 'Irrigation_availability' : 'Ναί' , 'στρέμματα' : 3500 , 'Κατάσταση_εδάφους' : 'Βρεγμένος' ,
'Χώρα' : 'Ινδία' },

{ 'Τύπος_Εδάφους' : 'Το κόκκινο' , 'Irrigation_availability' : 'Ναί' , 'στρέμματα' : 210 , 'Κατάσταση_εδάφους' : 'Ξηρός' ,
'Χώρα' : 'ΗΝΩΜΕΝΟ ΒΑΣΙΛΕΙΟ' },

{ 'Τύπος_Εδάφους' : 'Αλλα' , 'Irrigation_availability' : 'Οχι' , 'στρέμματα' : 1000 , 'Κατάσταση_εδάφους' : 'Βρεγμένος' ,
'Χώρα' : 'ΗΠΑ' },

{ 'Τύπος_Εδάφους' : 'Αμμος' , 'Irrigation_availability' : 'Οχι' , 'στρέμματα' : 500 , 'Κατάσταση_εδάφους' : 'Ξηρός' ,
'Χώρα' : 'Ινδία' }]



# δημιουργήστε το πλαίσιο δεδομένων από τα παραπάνω δεδομένα

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Γράψτε το παραπάνω DataFrame στον πίνακα.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )

Παραγωγή:







Μπορούμε να δούμε ότι δημιουργείται ένα αρχείο παρκέ με τα προηγούμενα δεδομένα PySpark.



Παράδειγμα 2:

Σκεφτείτε το προηγούμενο DataFrame και γράψτε το 'Agri_Table2' στον πίνακα διαιρώντας τις εγγραφές με βάση τις τιμές στη στήλη 'Χώρα'.

# Γράψτε το παραπάνω DataFrame στον πίνακα με την παράμετρο partitionBy

agri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'Χώρα' ])

Παραγωγή:

Υπάρχουν τρεις μοναδικές τιμές στη στήλη 'Χώρα' - 'Ινδία', 'Ηνωμένο Βασίλειο' και 'ΗΠΑ'. Έτσι, δημιουργούνται τρία διαμερίσματα. Κάθε διαμέρισμα περιέχει τα αρχεία παρκέ.

Pyspark.sql.DataFrameReader.table()

Ας φορτώσουμε τον πίνακα στο PySpark DataFrame χρησιμοποιώντας τη συνάρτηση spark.read.table(). Χρειάζεται μόνο μία παράμετρος που είναι το όνομα διαδρομής/πίνακα. Φορτώνει απευθείας τον πίνακα στο PySpark DataFrame και όλες οι συναρτήσεις SQL που εφαρμόζονται στο PySpark DataFrame μπορούν επίσης να εφαρμοστούν σε αυτό το φορτωμένο DataFrame.

Σύνταξη:

spark_app.read.table (διαδρομή/'Όνομα_Πίνακα')

Σε αυτό το σενάριο, χρησιμοποιούμε τον προηγούμενο πίνακα που δημιουργήθηκε από το PySpark DataFrame. Βεβαιωθείτε ότι πρέπει να εφαρμόσετε τα προηγούμενα αποσπάσματα κώδικα σεναρίου στο περιβάλλον σας.

Παράδειγμα:

Φορτώστε τον πίνακα 'Agri_Table1' στο DataFrame με το όνομα 'loaded_data'.

loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )

loaded_data.show()

Παραγωγή:

Μπορούμε να δούμε ότι ο πίνακας έχει φορτωθεί στο PySpark DataFrame.

Εκτέλεση των ερωτημάτων SQL

Τώρα, εκτελούμε ορισμένα ερωτήματα SQL στο φορτωμένο DataFrame χρησιμοποιώντας τη συνάρτηση spark.sql().

# Χρησιμοποιήστε την εντολή SELECT για να εμφανίσετε όλες τις στήλες από τον παραπάνω πίνακα.

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1' ).προβολή()

Ρήτρα # WHERE

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Dry'' ).προβολή()

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000 ' ).προβολή()

Παραγωγή:

  1. Το πρώτο ερώτημα εμφανίζει όλες τις στήλες και τις εγγραφές από το DataFrame.
  2. Το δεύτερο ερώτημα εμφανίζει τις εγγραφές με βάση τη στήλη 'Soil_status'. Υπάρχουν μόνο τρεις δίσκοι με το στοιχείο 'Dry'.
  3. Το τελευταίο ερώτημα επιστρέφει δύο εγγραφές με 'Acres' που είναι μεγαλύτερες από 2000.

Pyspark.sql.DataFrameWriter.insertInto()

Χρησιμοποιώντας τη συνάρτηση insertInto(), μπορούμε να προσαρτήσουμε το DataFrame στον υπάρχοντα πίνακα. Μπορούμε να χρησιμοποιήσουμε αυτή τη συνάρτηση μαζί με το selectExpr() για να ορίσουμε τα ονόματα των στηλών και στη συνέχεια να την εισάγουμε στον πίνακα. Αυτή η συνάρτηση λαμβάνει επίσης ως παράμετρο το tableName.

Σύνταξη:

DataFrame_obj.write.insertInto('Όνομα_Πίνακα')

Σε αυτό το σενάριο, χρησιμοποιούμε τον προηγούμενο πίνακα που δημιουργήθηκε από το PySpark DataFrame. Βεβαιωθείτε ότι πρέπει να εφαρμόσετε τα προηγούμενα αποσπάσματα κώδικα σεναρίου στο περιβάλλον σας.

Παράδειγμα:

Δημιουργήστε ένα νέο DataFrame με δύο εγγραφές και τοποθετήστε τις στον πίνακα 'Agri_Table1'.

εισαγωγή pyspark

από το pyspark.sql εισαγωγή SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

# γεωργικά δεδομένα με 2 σειρές

agri =[{ 'Τύπος_Εδάφους' : 'Αμμος' , 'Irrigation_availability' : 'Οχι' , 'στρέμματα' : 2500 , 'Κατάσταση_εδάφους' : 'Ξηρός' ,
'Χώρα' : 'ΗΠΑ' },

{ 'Τύπος_Εδάφους' : 'Αμμος' , 'Irrigation_availability' : 'Οχι' , 'στρέμματα' : 1200 , 'Κατάσταση_εδάφους' : 'Βρεγμένος' ,
'Χώρα' : 'Ιαπωνία' }]

# δημιουργήστε το πλαίσιο δεδομένων από τα παραπάνω δεδομένα

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'στρέμματα' , 'Χώρα' , 'Irrigation_availability' , 'Τύπος_Εδάφους' ,
'Κατάσταση_εδάφους' ).write.insertInto( 'Agri_Table1' )

# Εμφάνιση του τελικού Agri_Table1

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1' ).προβολή()

Παραγωγή:

Τώρα, ο συνολικός αριθμός σειρών που υπάρχουν στο DataFrame είναι 7.

συμπέρασμα

Τώρα καταλαβαίνετε πώς να γράψετε το PySpark DataFrame στον πίνακα χρησιμοποιώντας τη συνάρτηση write.saveAsTable(). Παίρνει το όνομα του πίνακα και άλλες προαιρετικές παραμέτρους. Στη συνέχεια, φορτώσαμε αυτόν τον πίνακα στο PySpark DataFrame χρησιμοποιώντας τη συνάρτηση spark.read.table(). Χρειάζεται μόνο μία παράμετρος που είναι το όνομα διαδρομής/πίνακα. Εάν θέλετε να προσθέσετε το νέο DataFrame στον υπάρχοντα πίνακα, χρησιμοποιήστε τη συνάρτηση insertInto().