Μετατροπή PySpark DataFrame σε CSV

Metatrope Pyspark Dataframe Se Csv



Ας δούμε τα τέσσερα διαφορετικά σενάρια μετατροπής του PySpark DataFrame σε CSV. Απευθείας, χρησιμοποιούμε τη μέθοδο write.csv() για να μετατρέψουμε το PySpark DataFrame σε CSV. Χρησιμοποιώντας τη συνάρτηση to_csv(), μετατρέπουμε το PySpark Pandas DataFrame σε CSV. Μπορεί επίσης να είναι δυνατό μετατρέποντάς το στον πίνακα NumPy.

Θέμα Περιεχομένων:

Εάν θέλετε να μάθετε για το PySpark DataFrame και την εγκατάσταση της μονάδας, προχωρήστε σε αυτό άρθρο .







PySpark DataFrame σε CSV με μετατροπή σε Pandas DataFrame

Η to_csv() είναι μια μέθοδος που είναι διαθέσιμη στη λειτουργική μονάδα Pandas η οποία μετατρέπει το Pandas DataFrame σε CSV. Πρώτα, πρέπει να μετατρέψουμε το PySpark DataFrame σε Pandas DataFrame. Για να γίνει αυτό χρησιμοποιείται η μέθοδος toPandas(). Ας δούμε τη σύνταξη της to_csv() μαζί με τις παραμέτρους της.



Σύνταξη:



pandas_dataframe_obj.to_csv(path/ 'file_name.csv' , επί κεφαλής , ευρετήριο, στήλες, λειτουργία...)
  1. Πρέπει να καθορίσουμε το όνομα αρχείου του αρχείου CSV. Εάν θέλετε να αποθηκεύσετε το ληφθέν CSV σε μια συγκεκριμένη θέση στον υπολογιστή σας, μπορείτε επίσης να καθορίσετε τη διαδρομή μαζί με το όνομα του αρχείου.
  2. Οι στήλες περιλαμβάνονται εάν η κεφαλίδα έχει οριστεί σε 'True'. Εάν δεν χρειάζεστε στήλες, ορίστε την κεφαλίδα σε 'False'.
  3. Οι δείκτες καθορίζονται εάν το ευρετήριο έχει οριστεί σε 'True'. Εάν δεν χρειάζεστε δείκτες, ορίστε το ευρετήριο σε 'False'.
  4. Η παράμετρος Columns παίρνει μια λίστα ονομάτων στηλών στην οποία μπορούμε να καθορίσουμε ποιες συγκεκριμένες στήλες εξάγονται στο αρχείο CSV.
  5. Μπορούμε να προσθέσουμε τις εγγραφές στο CSV χρησιμοποιώντας την παράμετρο mode. Προσάρτημα - το 'a' χρησιμοποιείται για να γίνει αυτό.

Παράδειγμα 1: Με τις παραμέτρους κεφαλίδας και ευρετηρίου

Δημιουργήστε το PySpark DataFrame 'skills_df' με 3 σειρές και 4 στήλες. Μετατρέψτε αυτό το DataFrame σε CSV μετατρέποντάς το πρώτα στο Pandas DataFrame.





εισαγωγή pyspark

από το pyspark.sql εισαγωγή SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

# δεδομένα δεξιοτήτων με 3 σειρές και 4 στήλες

δεξιότητες =[{ 'ταυτότητα' : 123 , 'πρόσωπο' : 'Μέλι' , 'επιδεξιότητα' : 'ζωγραφική' , 'βραβείο' : 25000 },

{ 'ταυτότητα' : 112 , 'πρόσωπο' : 'Mouni' , 'επιδεξιότητα' : 'χορός' , 'βραβείο' : 2000 },

{ 'ταυτότητα' : 153 , 'πρόσωπο' : 'Tulasi' , 'επιδεξιότητα' : 'ΑΝΑΓΝΩΣΗ' , 'βραβείο' : 1200 }

]

# δημιουργήστε το πλαίσιο δεδομένων δεξιοτήτων από τα παραπάνω δεδομένα

skills_df = linuxhint_spark_app.createDataFrame(skills)

skills_df.show()

# Μετατροπή skills_df σε pandas DataFrame

pandas_skills_df= skills_df.toPandas()

print(pandas_skills_df)

# Μετατρέψτε αυτό το DataFrame σε csv με κεφαλίδα και ευρετήριο

pandas_skills_df.to_csv( 'pandas_skills1.csv' , επί κεφαλής =True, index=True)

Παραγωγή:



Μπορούμε να δούμε ότι το PySpark DataFrame μετατρέπεται σε Pandas DataFrame. Ας δούμε αν μετατρέπεται σε CSV με ονόματα στηλών και δείκτες:

Παράδειγμα 2: Προσθέστε τα δεδομένα στο CSV

Δημιουργήστε ένα ακόμη PySpark DataFrame με 1 εγγραφή και προσθέστε το στο CSV που δημιουργήθηκε ως μέρος του πρώτου παραδείγματος. Βεβαιωθείτε ότι πρέπει να ορίσουμε την κεφαλίδα σε 'False' μαζί με την παράμετρο mode. Διαφορετικά, τα ονόματα στηλών προσαρτώνται επίσης ως σειρά.

εισαγωγή pyspark

από το pyspark.sql εισαγωγή SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

δεξιότητες =[{ 'ταυτότητα' : 90 , 'πρόσωπο' : 'Μπχαργκάβ' , 'επιδεξιότητα' : 'ΑΝΑΓΝΩΣΗ' , 'βραβείο' : 12000 }

]

# δημιουργήστε το πλαίσιο δεδομένων δεξιοτήτων από τα παραπάνω δεδομένα

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Μετατροπή skills_df σε pandas DataFrame

pandas_skills_df= skills_df.toPandas()

# Προσθέστε αυτό το DataFrame στο αρχείο pandas_skills1.csv

pandas_skills_df.to_csv( 'pandas_skills1.csv' , mode= 'ένα' , επί κεφαλής =Λάθος)

Έξοδος CSV:

Μπορούμε να δούμε ότι μια νέα σειρά προστίθεται στο αρχείο CSV.

Παράδειγμα 3: Με την παράμετρο Columns

Ας έχουμε το ίδιο DataFrame και ας το μετατρέψουμε σε CSV με δύο στήλες: 'πρόσωπο' και 'βραβείο'.

εισαγωγή pyspark

από το pyspark.sql εισαγωγή SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

# δεδομένα δεξιοτήτων με 3 σειρές και 4 στήλες

δεξιότητες =[{ 'ταυτότητα' : 123 , 'πρόσωπο' : 'Μέλι' , 'επιδεξιότητα' : 'ζωγραφική' , 'βραβείο' : 25000 },

{ 'ταυτότητα' : 112 , 'πρόσωπο' : 'Mouni' , 'επιδεξιότητα' : 'χορός' , 'βραβείο' : 2000 },

{ 'ταυτότητα' : 153 , 'πρόσωπο' : 'Tulasi' , 'επιδεξιότητα' : 'ΑΝΑΓΝΩΣΗ' , 'βραβείο' : 1200 }

]

# δημιουργήστε το πλαίσιο δεδομένων δεξιοτήτων από τα παραπάνω δεδομένα

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Μετατροπή skills_df σε pandas DataFrame

pandas_skills_df= skills_df.toPandas()

# Μετατρέψτε αυτό το DataFrame σε csv με συγκεκριμένες στήλες

pandas_skills_df.to_csv( 'pandas_skills2.csv' , στήλες=[ 'πρόσωπο' , 'βραβείο' ])

Έξοδος CSV:

Μπορούμε να δούμε ότι μόνο οι στήλες 'πρόσωπο' και 'βραβείο' υπάρχουν στο αρχείο CSV.

PySpark Pandas DataFrame σε CSV χρησιμοποιώντας τη μέθοδο To_Csv()

Η to_csv() είναι μια μέθοδος που είναι διαθέσιμη στη λειτουργική μονάδα Pandas η οποία μετατρέπει το Pandas DataFrame σε CSV. Πρώτα, πρέπει να μετατρέψουμε το PySpark DataFrame σε Pandas DataFrame. Για να γίνει αυτό χρησιμοποιείται η μέθοδος toPandas(). Ας δούμε τη σύνταξη της to_csv() μαζί με τις παραμέτρους της:

Σύνταξη:

pyspark_pandas_dataframe_obj.to_csv(path/ 'file_name.csv' , επί κεφαλής , ευρετήριο, στήλες,...)
  1. Πρέπει να καθορίσουμε το όνομα αρχείου του αρχείου CSV. Εάν θέλετε να αποθηκεύσετε το ληφθέν CSV σε μια συγκεκριμένη θέση στον υπολογιστή σας, μπορείτε επίσης να καθορίσετε τη διαδρομή μαζί με το όνομα του αρχείου.
  2. Οι στήλες περιλαμβάνονται εάν η κεφαλίδα έχει οριστεί σε 'True'. Εάν δεν χρειάζεστε στήλες, ορίστε την κεφαλίδα σε 'False'.
  3. Οι δείκτες καθορίζονται εάν το ευρετήριο έχει οριστεί σε 'True'. Εάν δεν χρειάζεστε δείκτες, ορίστε το ευρετήριο σε 'False'.
  4. Η παράμετρος στηλών λαμβάνει μια λίστα ονομάτων στηλών στην οποία μπορούμε να καθορίσουμε ποιες συγκεκριμένες στήλες εξάγονται στο αρχείο CSV.

Παράδειγμα 1: Με την παράμετρο Columns

Δημιουργήστε ένα PySpark Pandas DataFrame με 3 στήλες και μετατρέψτε το σε CSV χρησιμοποιώντας to_csv() με τις στήλες 'person' και 'prize'.

από panda εισαγωγής pyspark

pyspark_pandas_dataframe=pandas.DataFrame({ 'ταυτότητα' :[ 90 , 78 , 90 , 57 ], 'πρόσωπο' :[ 'Μέλι' , 'Mouni' , 'ο ίδιος' , 'radha' ], 'βραβείο' :[ 1 , 2 , 3 , 4 ]})

print(pyspark_pandas_dataframe)

# Μετατρέψτε αυτό το DataFrame σε csv με συγκεκριμένες στήλες

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas1' , στήλες=[ 'πρόσωπο' , 'βραβείο' ])

Παραγωγή:

Μπορούμε να δούμε ότι το PySpark Pandas DataFrame μετατρέπεται σε CSV με δύο κατατμήσεις. Κάθε διαμέρισμα περιέχει 2 εγγραφές. Επίσης, οι στήλες στο CSV είναι μόνο 'person' και 'prize'.

Αρχείο κατάτμησης 1:

Αρχείο κατάτμησης 2:

Παράδειγμα 2: Με την Παράμετρο Κεφαλίδας

Χρησιμοποιήστε το προηγούμενο DataFrame και καθορίστε την παράμετρο κεφαλίδας ορίζοντας την σε 'True'.

από panda εισαγωγής pyspark

pyspark_pandas_dataframe=pandas.DataFrame({ 'ταυτότητα' :[ 90 , 78 , 90 , 57 ], 'πρόσωπο' :[ 'Μέλι' , 'Mouni' , 'ο ίδιος' , 'radha' ], 'βραβείο' :[ 1 , 2 , 3 , 4 ]})

# Μετατρέψτε αυτό το DataFrame σε csv με κεφαλίδα.

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas2' , επί κεφαλής = Αλήθεια)

Έξοδος CSV:

Μπορούμε να δούμε ότι το PySpark Pandas DataFrame μετατρέπεται σε CSV με δύο κατατμήσεις. Κάθε διαμέρισμα περιέχει 2 εγγραφές με ονόματα στηλών.

Αρχείο κατάτμησης 1:

Αρχείο κατάτμησης 2:

PySpark Pandas DataFrame σε CSV με μετατροπή σε NumPy Array

Έχουμε την επιλογή να μετατρέψουμε το PySpark Pandas DataFrame σε CSV μετατρέποντας στον πίνακα Numpy. Η to_numpy() είναι μια μέθοδος που είναι διαθέσιμη στη λειτουργική μονάδα PySpark Pandas η οποία μετατρέπει το PySpark Pandas DataFrame στον πίνακα NumPy.

Σύνταξη:

pyspark_pandas_dataframe_obj.to_numpy()

Δεν θα πάρει καμία παράμετρο.

Χρησιμοποιώντας τη μέθοδο Tofile().

Μετά τη μετατροπή στον πίνακα NumPy, μπορούμε να χρησιμοποιήσουμε τη μέθοδο tofile() για να μετατρέψουμε το NumPy σε CSV. Εδώ, αποθηκεύει κάθε εγγραφή σε ένα νέο κελί σε στήλη CSV.

Σύνταξη:

array_obj.to_numpy(όνομα αρχείου/διαδρομή, sep=' )

Παίρνει το όνομα αρχείου ή τη διαδρομή ενός CSV και ενός διαχωριστικού.

Παράδειγμα:

Δημιουργήστε το PySpark Pandas DataFrame με 3 στήλες και 4 εγγραφές και μετατρέψτε το σε CSV μετατρέποντάς το πρώτα σε πίνακα NumPy.

από panda εισαγωγής pyspark

pyspark_pandas_dataframe=pandas.DataFrame({ 'ταυτότητα' :[ 90 , 78 , 90 , 57 ], 'πρόσωπο' :[ 'Μέλι' , 'Mouni' , 'ο ίδιος' , 'radha' ], 'βραβείο' :[ 1 , 2 , 3 , 4 ]})

# Μετατρέψτε το παραπάνω DataFrame σε numpy array

μετατροπή = pyspark_pandas_dataframe.to_numpy()

εκτύπωση (μετατροπή)

# Χρήση tofile()

converted.tofile( 'converted1.csv' , σεπ = ',' )

Παραγωγή:

[[ 90 'Μέλι' 1 ]

[ 78 'Mouni' 2 ]

[ 90 'ο ίδιος' 3 ]

[ 57 'radha' 4 ]]

Μπορούμε να δούμε ότι το PySpark Pandas DataFrame μετατρέπεται σε πίνακα NumPy (12 τιμές). Εάν μπορείτε να δείτε τα δεδομένα CSV, αποθηκεύει κάθε τιμή κελιού σε μια νέα στήλη.

PySpark DataFrame σε CSV Χρησιμοποιώντας τη μέθοδο Write.Csv().

Η μέθοδος write.csv() παίρνει το όνομα/διαδρομή του αρχείου όπου πρέπει να αποθηκεύσουμε το αρχείο CSV ως παράμετρο.

Σύνταξη:

dataframe_object.coalesce( 1 ).write.csv( 'όνομα αρχείου' )

Στην πραγματικότητα, το CSV αποθηκεύεται ως κατατμήσεις (περισσότερες από μία). Για να απαλλαγούμε από αυτό, συγχωνεύουμε όλα τα κατατμημένα αρχεία CSV σε ένα. Σε αυτό το σενάριο, χρησιμοποιούμε τη συνάρτηση coalesce(). Τώρα, μπορούμε να δούμε μόνο ένα αρχείο CSV με όλες τις σειρές από το PySpark DataFrame.

Παράδειγμα:

Εξετάστε το PySpark DataFrame με 4 εγγραφές που έχουν 4 στήλες. Γράψτε αυτό το DataFrame σε CSV με το αρχείο που ονομάζεται 'market_details'.

εισαγωγή pyspark

από το pyspark.sql εισαγωγή SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

# δεδομένα αγοράς με 4 σειρές και 4 στήλες

αγορά =[{ 'στα μέσα' : 'mz-001' , 'm_name' : 'ΑΛΦΑΒΗΤΟ' , 'm_city' : 'Δελχί' , 'm_state' : 'Δελχί' },

{ 'στα μέσα' : 'mz-002' , 'm_name' : 'XYZ' , 'm_city' : 'πάτνα' , 'm_state' : «τύχη» },

{ 'στα μέσα' : 'mz-003' , 'm_name' : 'PQR' , 'm_city' : 'Φλόριντα' , 'm_state' : 'ένας' },

{ 'στα μέσα' : 'mz-004' , 'm_name' : 'ΑΛΦΑΒΗΤΟ' , 'm_city' : 'Δελχί' , 'm_state' : «τύχη» }

]



# δημιουργήστε το πλαίσιο δεδομένων αγοράς από τα παραπάνω δεδομένα

market_df = linuxhint_spark_app.createDataFrame(market)

# Πραγματικά δεδομένα αγοράς

market_df.show()

#write.csv()

market_df.coalesce( 1 ).write.csv( 'market_details' )

Παραγωγή:

Ας ελέγξουμε για το αρχείο:

Ανοίξτε το τελευταίο αρχείο για να δείτε τις εγγραφές.

συμπέρασμα

Μάθαμε τα τέσσερα διαφορετικά σενάρια που μετατρέπουν το PySpark DataFrame σε CSV με παραδείγματα λαμβάνοντας υπόψη διαφορετικές παραμέτρους. Όταν εργάζεστε με το PySpark DataFrame, έχετε δύο επιλογές για να μετατρέψετε αυτό το DataFrame σε CSV: ένας τρόπος είναι η χρήση της μεθόδου write() και ένας άλλος είναι η χρήση της μεθόδου to_csv() με μετατροπή σε Pandas DataFrame. Εάν εργάζεστε με το PySpark Pandas DataFrame, μπορείτε επίσης να χρησιμοποιήσετε τα to_csv() και tofile() μετατρέποντας σε πίνακα NumPy.