Μετατροπή PySpark DataFrame σε JSON

Metatrope Pyspark Dataframe Se Json



Η μετάδοση δομημένων δεδομένων χρησιμοποιώντας JSON είναι δυνατή και επίσης καταναλώνει χαμηλή μνήμη. Σε σύγκριση με το PySpark RDD ή το PySpark DataFrame, το JSON καταναλώνει χαμηλή μνήμη και σειριοποίηση που είναι δυνατή με το JSON. Μπορούμε να μετατρέψουμε το PySpark DataFrame σε JSON χρησιμοποιώντας τη μέθοδο pyspark.sql.DataFrameWriter.json(). Εκτός από αυτό, υπάρχουν δύο άλλοι τρόποι μετατροπής του DataFrame σε JSON.

Θέμα Περιεχομένων:

Ας εξετάσουμε ένα απλό PySpark DataFrame σε όλα τα παραδείγματα και ας το μετατρέψουμε σε JSON χρησιμοποιώντας τις αναφερόμενες συναρτήσεις.







Απαιτούμενη ενότητα:

Εγκαταστήστε τη βιβλιοθήκη PySpark στο περιβάλλον σας εάν δεν είναι ακόμα εγκατεστημένη. Μπορείτε να ανατρέξετε στην ακόλουθη εντολή για να το εγκαταστήσετε:



pip εγκατάσταση pyspark

PySpark DataFrame σε JSON Χρησιμοποιώντας To_json() με ToPandas()

Η μέθοδος to_json() είναι διαθέσιμη στη λειτουργική μονάδα Pandas που μετατρέπει το Pandas DataFrame σε JSON. Μπορούμε να χρησιμοποιήσουμε αυτήν τη μέθοδο εάν μετατρέψουμε το PySpark DataFrame σε Pandas DataFrame. Για να μετατρέψετε το PySpark DataFrame σε Pandas DataFrame, χρησιμοποιείται η μέθοδος toPandas(). Ας δούμε τη σύνταξη της to_json() μαζί με τις παραμέτρους της.



Σύνταξη:





dataframe_object.toPandas().to_json(orient,index,...)
  1. Το Orient χρησιμοποιείται για την εμφάνιση του JSON που έχει μετατραπεί ως την επιθυμητή μορφή. Παίρνει 'εγγραφές', 'πίνακας', 'τιμές', 'στήλες', 'ευρετήριο', 'διαίρεση'.
  2. Το ευρετήριο χρησιμοποιείται για τη συμπερίληψη/αφαίρεση του ευρετηρίου από τη συμβολοσειρά JSON που έχει μετατραπεί. Εάν έχει οριστεί σε 'True', εμφανίζονται οι δείκτες. Διαφορετικά, οι δείκτες δεν θα εμφανίζονται εάν ο προσανατολισμός είναι 'split' ή 'table'.

Παράδειγμα 1: Προσανατολισμός ως 'Εγγραφές'

Δημιουργήστε ένα PySpark DataFrame 'skills_df' με 3 σειρές και 4 στήλες. Μετατρέψτε αυτό το DataFrame σε JSON καθορίζοντας την παράμετρο orient ως 'εγγραφές'.

εισαγωγή pyspark

εισαγωγή παντα

από το pyspark.sql εισαγωγή SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

# δεδομένα δεξιοτήτων με 3 σειρές και 4 στήλες

δεξιότητες =[{ 'ταυτότητα' : 123 , 'πρόσωπο' : 'Μέλι' , 'επιδεξιότητα' : 'ζωγραφική' , 'βραβείο' : 25000 },

{ 'ταυτότητα' : 112 , 'πρόσωπο' : 'Mouni' , 'επιδεξιότητα' : 'χορός' , 'βραβείο' : 2000 },

{ 'ταυτότητα' : 153 , 'πρόσωπο' : 'Tulasi' , 'επιδεξιότητα' : 'ΑΝΑΓΝΩΣΗ' , 'βραβείο' : 1200 }

]

# δημιουργήστε το πλαίσιο δεδομένων δεξιοτήτων από τα παραπάνω δεδομένα

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Πραγματικά δεδομένα δεξιοτήτων

skills_df.show()

# Μετατροπή σε JSON χρησιμοποιώντας to_json() με orient ως 'εγγραφές'

json_skills_data = skills_df.toPandas().to_json(orient= 'ρεκόρ' )

print(json_skills_data)

Παραγωγή:



+---+------+-----+--------+

| ταυτότητα|άτομο|βραβείο| δεξιότητα|

+---+------+-----+--------+

| 123 | Μέλι| 25000 |ζωγραφική|

| 112 | Mouni| 2000 | χορός|

| 153 |Tulasi| 1200 | ανάγνωση|

+---+------+-----+--------+

[{ 'ταυτότητα' : 123 , 'πρόσωπο' : 'Μέλι' , 'βραβείο' : 25000 , 'επιδεξιότητα' : 'ζωγραφική' },{ 'ταυτότητα' : 112 , 'πρόσωπο' : 'Mouni' , 'βραβείο' : 2000 , 'επιδεξιότητα' : 'χορός' },{ 'ταυτότητα' : 153 , 'πρόσωπο' : 'Tulasi' , 'βραβείο' : 1200 , 'επιδεξιότητα' : 'ΑΝΑΓΝΩΣΗ' }]

Μπορούμε να δούμε ότι το PySpark DataFrame μετατρέπεται στον πίνακα JSON με ένα λεξικό τιμών. Εδώ, τα κλειδιά αντιπροσωπεύουν το όνομα της στήλης και η τιμή αντιπροσωπεύει την τιμή γραμμής/κελλιού στο PySpark DataFrame.

Παράδειγμα 2: Προσανατολισμός ως 'Split'

Η μορφή JSON που επιστρέφεται από τον προσανατολισμό 'split' περιλαμβάνει τα ονόματα στηλών που έχουν μια λίστα στηλών, λίστα ευρετηρίου και λίστα δεδομένων. Ακολουθεί η μορφή του 'split' orient.

# Μετατροπή σε JSON χρησιμοποιώντας to_json() με orient ως 'split'

json_skills_data = skills_df.toPandas().to_json(orient= 'διαίρεση' )

print(json_skills_data)

Παραγωγή:

{ 'στήλες' :[ 'ταυτότητα' , 'πρόσωπο' , 'βραβείο' , 'επιδεξιότητα' ], 'δείκτης' :[ 0 , 1 , 2 ], 'δεδομένα' :[[ 123 , 'Μέλι' , 25000 , 'ζωγραφική' ],[ 112 , 'Mouni' , 2000 , 'χορός' ],[ 153 , 'Tulasi' , 1200 , 'ΑΝΑΓΝΩΣΗ' ]]}

Παράδειγμα 3: Προσανατολισμός ως 'Ευρετήριο'

Εδώ, κάθε γραμμή από το PySpark DataFrame αποσύρεται με τη μορφή ενός λεξικού με το κλειδί ως όνομα στήλης. Για κάθε λεξικό, η θέση ευρετηρίου καθορίζεται ως κλειδί.

# Μετατροπή σε JSON χρησιμοποιώντας to_json() με orient ως 'index'

json_skills_data = skills_df.toPandas().to_json(orient= 'δείκτης' )

print(json_skills_data)

Παραγωγή:

{ '0' :{ 'ταυτότητα' : 123 , 'πρόσωπο' : 'Μέλι' , 'βραβείο' : 25000 , 'επιδεξιότητα' : 'ζωγραφική' }, '1' :{ 'ταυτότητα' : 112 , 'πρόσωπο' : 'Mouni' , 'βραβείο' : 2000 , 'επιδεξιότητα' : 'χορός' }, '2' :{ 'ταυτότητα' : 153 , 'πρόσωπο' : 'Tulasi' , 'βραβείο' : 1200 , 'επιδεξιότητα' : 'ΑΝΑΓΝΩΣΗ' }}

Παράδειγμα 4: Προσανατολισμός ως 'Στήλες'

Οι στήλες είναι το κλειδί για κάθε εγγραφή. Κάθε στήλη περιέχει ένα λεξικό που παίρνει τις τιμές στηλών με αριθμούς ευρετηρίου.

# Μετατροπή σε JSON χρησιμοποιώντας to_json() με orient ως 'στήλες'

json_skills_data = skills_df.toPandas().to_json(orient= 'στήλες' )

print(json_skills_data)

Παραγωγή:

{ 'ταυτότητα' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'πρόσωπο' :{ '0' : 'Μέλι' , '1' : 'Mouni' , '2' : 'Tulasi' }, 'βραβείο' :{ '0' : 25000 , '1' : 2000 , '2' : 1200 }, 'επιδεξιότητα' :{ '0' : 'ζωγραφική' , '1' : 'χορός' , '2' : 'ΑΝΑΓΝΩΣΗ' }}

Παράδειγμα 5: Προσανατολισμός ως 'Αξίες'

Εάν χρειάζεστε μόνο τις τιμές στο JSON, μπορείτε να επιλέξετε τον προσανατολισμό 'τιμές'. Εμφανίζει κάθε σειρά σε μια λίστα. Τέλος, όλες οι λίστες αποθηκεύονται σε μια λίστα. Αυτό το JSON είναι τύπου ένθετης λίστας.

# Μετατροπή σε JSON χρησιμοποιώντας to_json() με orient ως 'τιμές'

json_skills_data = skills_df.toPandas().to_json(orient= 'αξίες' )

print(json_skills_data)

Παραγωγή:

[[ 123 , 'Μέλι' , 25000 , 'ζωγραφική' ],[ 112 , 'Mouni' , 2000 , 'χορός' ],[ 153 , 'Tulasi' , 1200 , 'ΑΝΑΓΝΩΣΗ' ]]

Παράδειγμα 6: Προσανατολισμός ως 'Πίνακας'

Ο προσανατολισμός 'πίνακας' επιστρέφει το JSON που περιλαμβάνει το σχήμα με ονόματα πεδίων μαζί με τους τύπους δεδομένων στηλών, το ευρετήριο ως πρωτεύον κλειδί και την έκδοση Pandas. Τα ονόματα στηλών με τιμές εμφανίζονται ως 'δεδομένα'.

# Μετατροπή σε JSON χρησιμοποιώντας to_json() με orient ως 'πίνακας'

json_skills_data = skills_df.toPandas().to_json(orient= 'τραπέζι' )

print(json_skills_data)

Παραγωγή:

{ 'σχήμα' :{ 'χωράφια' :[{ 'όνομα' : 'δείκτης' , 'τύπος' : 'ακέραιος αριθμός' },{ 'όνομα' : 'ταυτότητα' , 'τύπος' : 'ακέραιος αριθμός' },{ 'όνομα' : 'πρόσωπο' , 'τύπος' : 'σειρά' },{ 'όνομα' : 'βραβείο' , 'τύπος' : 'ακέραιος αριθμός' },{ 'όνομα' : 'επιδεξιότητα' , 'τύπος' : 'σειρά' }], 'πρωτεύων κλειδί' :[ 'δείκτης' ], 'pandas_version' : '1.4.0' }, 'δεδομένα' :[{ 'δείκτης' : 0 , 'ταυτότητα' : 123 , 'πρόσωπο' : 'Μέλι' , 'βραβείο' : 25000 , 'επιδεξιότητα' : 'ζωγραφική' },{ 'δείκτης' : 1 , 'ταυτότητα' : 112 , 'πρόσωπο' : 'Mouni' , 'βραβείο' : 2000 , 'επιδεξιότητα' : 'χορός' },{ 'δείκτης' : 2 , 'ταυτότητα' : 153 , 'πρόσωπο' : 'Tulasi' , 'βραβείο' : 1200 , 'επιδεξιότητα' : 'ΑΝΑΓΝΩΣΗ' }]}

Παράδειγμα 7: Με παράμετρο ευρετηρίου

Αρχικά, μεταβιβάζουμε την παράμετρο ευρετηρίου ορίζοντας την σε 'True'. Θα δείτε για κάθε τιμή στήλης ότι η θέση ευρετηρίου επιστρέφεται ως κλειδί σε ένα λεξικό.

Στη δεύτερη έξοδο, μόνο τα ονόματα στηλών ('στήλες') και οι εγγραφές ('δεδομένα') επιστρέφονται χωρίς τις θέσεις ευρετηρίου αφού το ευρετήριο έχει οριστεί σε 'False'.

# Μετατροπή σε JSON χρησιμοποιώντας to_json() με index=True

json_skills_data = skills_df.toPandas().to_json(index=True)

print(json_skills_data, ' \n ' )

# Μετατροπή σε JSON χρησιμοποιώντας to_json() με index=False

json_skills_data= skills_df.toPandas().to_json(index=False,orient= 'διαίρεση' )

print(json_skills_data)

Παραγωγή:

{ 'ταυτότητα' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'πρόσωπο' :{ '0' : 'Μέλι' , '1' : 'Mouni' , '2' : 'Tulasi' }, 'βραβείο' :{ '0' : 25000 , '1' : 2000 , '2' : 1200 }, 'επιδεξιότητα' :{ '0' : 'ζωγραφική' , '1' : 'χορός' , '2' : 'ΑΝΑΓΝΩΣΗ' }}

{ 'στήλες' :[ 'ταυτότητα' , 'πρόσωπο' , 'βραβείο' , 'επιδεξιότητα' ], 'δεδομένα' :[[ 123 , 'Μέλι' , 25000 , 'ζωγραφική' ],[ 112 , 'Mouni' , 2000 , 'χορός' ],[ 153 , 'Tulasi' , 1200 , 'ΑΝΑΓΝΩΣΗ' ]]

PySpark DataFrame σε JSON με χρήση ToJSON()

Η μέθοδος toJSON() χρησιμοποιείται για τη μετατροπή του PySpark DataFrame σε αντικείμενο JSON. Βασικά, επιστρέφει μια συμβολοσειρά JSON που περιβάλλεται από μια λίστα. ο [‘{στήλη:τιμή,…}’,…. ] είναι η μορφή που επιστρέφεται από αυτή τη συνάρτηση. Εδώ, κάθε γραμμή από το PySpark DataFrame επιστρέφεται ως λεξικό με το όνομα της στήλης ως κλειδί.

Σύνταξη:

dataframe_object.toJSON()

Μπορεί να είναι δυνατή η μετάδοση παραμέτρων όπως το ευρετήριο, οι ετικέτες στηλών και ο τύπος δεδομένων.

Παράδειγμα:

Δημιουργήστε ένα PySpark DataFrame 'skills_df' με 5 σειρές και 4 στήλες. Μετατρέψτε αυτό το DataFrame σε JSON χρησιμοποιώντας τη μέθοδο toJSON().

εισαγωγή pyspark

από το pyspark.sql εισαγωγή SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

# δεδομένα δεξιοτήτων με 5 σειρές και 4 στήλες

δεξιότητες =[{ 'ταυτότητα' : 123 , 'πρόσωπο' : 'Μέλι' , 'επιδεξιότητα' : 'ζωγραφική' , 'βραβείο' : 25000 },

{ 'ταυτότητα' : 112 , 'πρόσωπο' : 'Mouni' , 'επιδεξιότητα' : 'μουσική/χορός' , 'βραβείο' : 2000 },

{ 'ταυτότητα' : 153 , 'πρόσωπο' : 'Tulasi' , 'επιδεξιότητα' : 'ΑΝΑΓΝΩΣΗ' , 'βραβείο' : 1200 },

{ 'ταυτότητα' : 173 , 'πρόσωπο' : 'Ετρεξα' , 'επιδεξιότητα' : 'ΜΟΥΣΙΚΗ' , 'βραβείο' : 2000 },

{ 'ταυτότητα' : 43 , 'πρόσωπο' : 'Καμάλα' , 'επιδεξιότητα' : 'ΑΝΑΓΝΩΣΗ' , 'βραβείο' : 10000 }

]

# δημιουργήστε το πλαίσιο δεδομένων δεξιοτήτων από τα παραπάνω δεδομένα

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Πραγματικά δεδομένα δεξιοτήτων

skills_df.show()

# Μετατροπή σε πίνακα JSON

json_skills_data = skills_df.toJSON().collect()

print(json_skills_data)

Παραγωγή:

+---+------+-----+-----------+

| ταυτότητα|άτομο|βραβείο| δεξιότητα|

+---+------+-----+-----------+

| 123 | Μέλι| 25000 | ζωγραφική|

| 112 | Mouni| 2000 |μουσική/χορός|

| 153 |Tulasi| 1200 | ανάγνωση|

| 173 | Έτρεξε| 2000 | μουσική |

| 43 |Καμάλα| 10000 | ανάγνωση|

+---+------+-----+-----------+

[ '{'id':123,'person':'Honey','prize':25000,'skill':'painting'}' , '{'id':112,'person':'Mouni','prize':2000,'skill':'music/dance'}' , '{'id':153,'person':'Tulasi','prize':1200,'skill':'reading'}' , '{'id':173,'person':'Ran','prize':2000,'skill':'music'}' , '{'id':43,'person':'Kamala','prize':10000,'skill':'reading'}' ]

Υπάρχουν 5 σειρές στο PySpark DataFrame. Όλες αυτές οι 5 σειρές επιστρέφονται ως λεξικό συμβολοσειρών που χωρίζονται με κόμμα.

PySpark DataFrame σε JSON χρησιμοποιώντας Write.json()

Η μέθοδος write.json() είναι διαθέσιμη στο PySpark που εγγράφει/αποθηκεύει το PySpark DataFrame σε ένα αρχείο JSON. Παίρνει το όνομα/διαδρομή του αρχείου ως παράμετρο. Βασικά, επιστρέφει το JSON σε πολλαπλά αρχεία (διαμερισμένα αρχεία). Για να τα συγχωνεύσουμε όλα σε ένα μόνο αρχείο, μπορούμε να χρησιμοποιήσουμε τη μέθοδο coalesce().

Σύνταξη:

dataframe_object.coalesce( 1 ).write.json('όνομα_αρχείου')
  1. Λειτουργία προσθήκης - dataframe_object.write.mode('append').json('file_name')
  2. Λειτουργία αντικατάστασης – dataframe_object.write.mode('overwrite').json('file_name')

Μπορεί να είναι δυνατή η προσάρτηση/αντικατάσταση του υπάρχοντος JSON. Χρησιμοποιώντας το write.mode(), μπορούμε να προσαρτήσουμε τα δεδομένα περνώντας το 'append' ή να αντικαταστήσουμε τα υπάρχοντα δεδομένα JSON περνώντας το 'overwrite' σε αυτήν τη συνάρτηση.

Παράδειγμα 1:

Δημιουργήστε ένα PySpark DataFrame 'skills_df' με 3 σειρές και 4 στήλες. Γράψτε αυτό το DataFrame σε JSON.

εισαγωγή pyspark

εισαγωγή παντα

από το pyspark.sql εισαγωγή SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

# δεδομένα δεξιοτήτων με 3 σειρές και 4 στήλες

δεξιότητες =[{ 'ταυτότητα' : 123 , 'πρόσωπο' : 'Μέλι' , 'επιδεξιότητα' : 'ζωγραφική' , 'βραβείο' : 25000 },

{ 'ταυτότητα' : 112 , 'πρόσωπο' : 'Mouni' , 'επιδεξιότητα' : 'χορός' , 'βραβείο' : 2000 },

{ 'ταυτότητα' : 153 , 'πρόσωπο' : 'Tulasi' , 'επιδεξιότητα' : 'ΑΝΑΓΝΩΣΗ' , 'βραβείο' : 1200 }

]

# δημιουργήστε το πλαίσιο δεδομένων δεξιοτήτων από τα παραπάνω δεδομένα

skills_df = linuxhint_spark_app.createDataFrame(skills)

# write.json()

skills_df.coalesce( 1 ).write.json( 'δεδομένα_δεξιοτήτων' )

Αρχείο JSON:

Μπορούμε να δούμε ότι ο φάκελος skills_data περιλαμβάνει τα χωρισμένα δεδομένα JSON.

Ας ανοίξουμε το αρχείο JSON. Μπορούμε να δούμε ότι όλες οι σειρές από το PySpark DataFrame μετατρέπονται σε JSON.

Υπάρχουν 5 σειρές στο PySpark DataFrame. Όλες αυτές οι 5 σειρές επιστρέφονται ως λεξικό συμβολοσειρών που χωρίζονται με κόμμα.

Παράδειγμα 2:

Δημιουργήστε ένα PySpark DataFrame 'skills2_df' με μία σειρά. Προσθέστε μια σειρά στο προηγούμενο αρχείο JSON καθορίζοντας τη λειτουργία ως 'προσάρτηση'.

εισαγωγή pyspark

εισάγετε πάντα

από το pyspark.sql εισαγωγή SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

δεξιότητες2 =[{ 'ταυτότητα' : 78 , 'πρόσωπο' : 'Μαρία' , 'επιδεξιότητα' : 'ιππασία' , 'βραβείο' : 8960 }

]

# δημιουργήστε το πλαίσιο δεδομένων δεξιοτήτων από τα παραπάνω δεδομένα

skills2_df = linuxhint_spark_app.createDataFrame(skills2)

# write.json() με λειτουργία προσάρτησης.

skills2_df.write.mode( 'προσαρτώ' ).json( 'δεδομένα_δεξιοτήτων' )

Αρχείο JSON:

Μπορούμε να δούμε τα χωρισμένα αρχεία JSON. Το πρώτο αρχείο περιέχει τις πρώτες εγγραφές DataFrame και το δεύτερο αρχείο περιέχει τη δεύτερη εγγραφή DataFrame.

συμπέρασμα

Υπάρχουν τρεις διαφορετικοί τρόποι μετατροπής του PySpark DataFrame σε JSON. Αρχικά, συζητήσαμε τη μέθοδο to_json() που μετατρέπεται σε JSON μετατρέποντας το PySpark DataFrame στο Pandas DataFrame με διαφορετικά παραδείγματα λαμβάνοντας υπόψη διαφορετικές παραμέτρους. Στη συνέχεια, χρησιμοποιήσαμε τη μέθοδο toJSON(). Τέλος, μάθαμε πώς να χρησιμοποιούμε τη συνάρτηση write.json() για να γράψουμε το PySpark DataFrame σε JSON. Η προσάρτηση και η αντικατάσταση είναι δυνατή με αυτήν τη λειτουργία.