PySpark Read JSON()

Pyspark Read Json



Κατά την εργασία με το PySpark DataFrames, πρέπει να αποθηκευτεί στο PySpark DataFrame εάν θέλετε να επεξεργαστείτε τα δεδομένα JSON. Μετά την αποθήκευση στο DataFrame, μπορούμε να εφαρμόσουμε τις διαφορετικές λειτουργίες και μεθόδους στα δεδομένα. Επίσης, υπάρχουν τόσα πολλά πλεονεκτήματα αν μετατρέψουμε το JSON σε PySpark DataFrame αφού είναι απλό και μπορούμε να μετατρέψουμε/διαμερίσουμε τα δεδομένα με πιο απλό τρόπο.

Θέμα Περιεχομένων:

Ανάγνωση JSON στο PySpark DataFrame χρησιμοποιώντας Pandas.read_json()







Ανάγνωση JSON σε PySpark DataFrame χρησιμοποιώντας Spark.read.json()



Ανάγνωση JSON σε PySpark DataFrame με χρήση του PySpark SQL



Σε αυτό το σεμινάριο, θα δούμε πώς να διαβάζουμε το JSON στο PySpark DataFrame χρησιμοποιώντας τα pandas.read_json(), spark.read.json() και spark.sql. Σε όλα τα σενάρια, θα εξετάσουμε τα διαφορετικά παραδείγματα λαμβάνοντας υπόψη τις διαφορετικές μορφές JSON.





Εγκαταστήστε τη βιβλιοθήκη PySpark πριν εφαρμόσετε τα ακόλουθα παραδείγματα.

pip εγκατάσταση pyspark

Μετά την επιτυχή εγκατάσταση, μπορείτε να δείτε την έξοδο ως εξής:



Ανάγνωση JSON στο PySpark DataFrame χρησιμοποιώντας Pandas.read_json()

Στο PySpark, η μέθοδος createDataFrame() χρησιμοποιείται για την απευθείας δημιουργία του DataFrame. Εδώ, χρειάζεται απλώς να περάσουμε το αρχείο/διαδρομή JSON στο αρχείο JSON μέσω της μεθόδου pandas.read_json(). Αυτή η μέθοδος read_json() παίρνει το όνομα/διαδρομή αρχείου που είναι διαθέσιμο στη λειτουργική μονάδα Pandas. Αυτός είναι ο λόγος για τον οποίο είναι απαραίτητο να εισαγάγετε και να χρησιμοποιήσετε τη μονάδα Pandas.

Σύνταξη:

spark_app.createDataFrame(pandas.read_json( 'file_name.json' ))

Παράδειγμα:

Ας δημιουργήσουμε ένα αρχείο JSON με το όνομα 'student_skill.json' που περιέχει 2 εγγραφές. Εδώ, τα κλειδιά/στήλες είναι 'Μαθητής 1' και 'Μαθητής 2'. Οι σειρές είναι το όνομα, η ηλικία, η ικανότητα1 και η ικανότητα2.

εισαγωγή pyspark

εισάγετε πάντα

από το pyspark.sql εισαγωγή SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

# Χρήση pandas.read_json()

kandidat_skills = linuxhint_spark_app.createDataFrame(pandas.read_json( 'student_skill.json' ))

kandidat_skills.show()

Παραγωγή:

Μπορούμε να δούμε ότι τα δεδομένα JSON μετατρέπονται σε PySpark DataFrame με καθορισμένες στήλες και γραμμές.

2. Ανάγνωση JSON σε PySpark DataFrame χρησιμοποιώντας Spark.read.json()

Η read.json() είναι μια μέθοδος παρόμοια με την read_json() στα Pandas. Εδώ, η read.json() παίρνει μια διαδρομή προς το JSON ή απευθείας στο αρχείο JSON και το φορτώνει απευθείας στο PySpark DataFrame. Δεν χρειάζεται να χρησιμοποιήσετε τη μέθοδο createDataFrame() σε αυτό το σενάριο. Εάν θέλετε να διαβάζετε πολλά αρχεία JSON ταυτόχρονα, πρέπει να περάσουμε μια λίστα ονομάτων αρχείων JSON μέσω μιας λίστας που χωρίζεται με κόμμα. Όλες οι εγγραφές JSON αποθηκεύονται σε ένα DataFrame.

Σύνταξη:

Μεμονωμένο αρχείο - spark_app.read.json( 'file_name.json' )

Πολλαπλά αρχεία - spark_app.read.json([ 'file1.json' , 'file2.json' ,...])

Σενάριο 1: Διαβάστε το JSON έχοντας μία γραμμή

Εάν το αρχείο JSON είναι σε μορφές record1, record2, record3… (μονής γραμμής), μπορούμε να το ονομάσουμε JSON με μεμονωμένες γραμμές. Το Spark επεξεργάζεται αυτές τις εγγραφές και τις αποθηκεύει στο PySpark DataFrame ως σειρές. Κάθε εγγραφή είναι μια σειρά στο PySpark DataFrame.

Ας δημιουργήσουμε ένα αρχείο JSON με το όνομα 'candidate_skills.json' που περιέχει 3 εγγραφές. Διαβάστε αυτό το JSON στο PySpark DataFrame.

εισαγωγή pyspark

από το pyspark.sql εισαγωγή SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

# Διαβάστε το kandidat_skills.json στο PySpark DataFrame

kandidat_skills = linuxhint_spark_app.read.json( 'candidate_skills.json' )

kandidat_skills.show()

Παραγωγή:

Μπορούμε να δούμε ότι τα δεδομένα JSON μετατρέπονται σε PySpark DataFrame με καθορισμένες εγγραφές και ονόματα στηλών.

Σενάριο 2: Διαβάστε το JSON με πολλές γραμμές

Εάν το αρχείο JSON έχει πολλές γραμμές, πρέπει να χρησιμοποιήσετε τη μέθοδο read.option().json() για να μεταβιβάσετε την παράμετρο πολλαπλών γραμμών που πρέπει να οριστεί σε true. Αυτό μας επιτρέπει να φορτώσουμε JSON με πολλές γραμμές στο PySpark DataFrame.

read.option( 'πολυγραμμή' , 'αληθής' ).json( 'file_name.json' )

Ας δημιουργήσουμε ένα αρχείο JSON με το όνομα 'multi.json' που περιέχει 3 εγγραφές. Διαβάστε αυτό το JSON στο PySpark DataFrame.

εισαγωγή pyspark

από το pyspark.sql εισαγωγή SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

# Διαβάστε το multi.json (με πολλές γραμμές) στο PySpark DataFrame

kandidat_skills = linuxhint_spark_app.read.option( 'πολυγραμμή' , 'αληθής' ).json( 'multi.json' )

kandidat_skills.show()

Παραγωγή:

Σενάριο 3: Ανάγνωση πολλαπλών JSON

Έχουμε ήδη συζητήσει στην αρχική φάση αυτού του σεμιναρίου σχετικά με πολλά αρχεία JSON. Εάν θέλετε να διαβάσετε πολλά αρχεία JSON ταυτόχρονα και να τα αποθηκεύσετε σε ένα μόνο PySpark DataFrame, πρέπει να περάσουμε μια λίστα ονομάτων αρχείων στη μέθοδο read.json().

Ας δημιουργήσουμε δύο αρχεία JSON με τα ονόματα 'candidate_skills.json' και 'candidate_skills2.json' και ας τα φορτώσουμε στο PySpark DataFrame.

Το αρχείο 'candidate_skills.json' περιέχει τρεις εγγραφές.

Το αρχείο 'candidate_skill2.json' περιέχει μόνο μία εγγραφή.

εισαγωγή pyspark

από το pyspark.sql εισαγωγή SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

# Ανάγνωση των αρχείων kandidat_skills και kandidat_skills2 κάθε φορά στο PySpark DataFrame

kandidat_skills = linuxhint_spark_app.read.json([ 'candidate_skills.json' , 'candidate_skills2.json' ])

kandidat_skills.show()

Παραγωγή:

Τέλος, το DataFrame κατέχει τέσσερις εγγραφές. Οι τρεις πρώτες εγγραφές ανήκουν στο πρώτο JSON και οι τελευταίες στο δεύτερο JSON.

Ανάγνωση JSON σε PySpark DataFrame χρησιμοποιώντας Spark.read.json()

Η read.json() είναι μια μέθοδος παρόμοια με την read_json() στα Pandas. Εδώ, το read.json() παίρνει μια διαδρομή προς το JSON ή απευθείας στο αρχείο JSON και το φορτώνει απευθείας στο PySpark DataFrame. Δεν χρειάζεται να χρησιμοποιήσετε τη μέθοδο createDataFrame() σε αυτό το σενάριο. Εάν θέλετε να διαβάζετε πολλά αρχεία JSON ταυτόχρονα, πρέπει να περάσουμε μια λίστα ονομάτων αρχείων JSON μέσω μιας λίστας που χωρίζεται με κόμμα. Όλες οι εγγραφές JSON αποθηκεύονται σε ένα DataFrame.

Σύνταξη:

Μεμονωμένο αρχείο - spark_app.read.json( 'file_name.json' )

Πολλαπλά αρχεία - spark_app.read.json([ 'file1.json' , 'file2.json' ,...])

Σενάριο 1: Διαβάστε το JSON έχοντας μία γραμμή

Εάν το αρχείο JSON είναι σε μορφή record1, record2, record3… (μονής γραμμής), μπορούμε να το ονομάσουμε JSON με μεμονωμένες γραμμές. Το Spark επεξεργάζεται αυτές τις εγγραφές και τις αποθηκεύει στο PySpark DataFrame ως σειρές. Κάθε εγγραφή είναι μια σειρά στο PySpark DataFrame.

Ας δημιουργήσουμε ένα αρχείο JSON με το όνομα 'candidate_skills.json' που περιέχει 3 εγγραφές. Διαβάστε αυτό το JSON στο PySpark DataFrame.

εισαγωγή pyspark

από το pyspark.sql εισαγωγή SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

# Διαβάστε το kandidat_skills.json στο PySpark DataFrame

kandidat_skills = linuxhint_spark_app.read.json( 'candidate_skills.json' )

kandidat_skills.show()

Παραγωγή:

Μπορούμε να δούμε ότι τα δεδομένα JSON μετατρέπονται σε PySpark DataFrame με καθορισμένες εγγραφές και ονόματα στηλών.

Ανάγνωση JSON σε PySpark DataFrame με χρήση του PySpark SQL

Μπορεί να είναι δυνατή η δημιουργία μιας προσωρινής προβολής των δεδομένων μας JSON χρησιμοποιώντας το PySpark SQL. Άμεσα, μπορούμε να παρέχουμε το JSON τη στιγμή της δημιουργίας της προσωρινής προβολής. Δείτε την παρακάτω σύνταξη. Μετά από αυτό, μπορούμε να χρησιμοποιήσουμε την εντολή SELECT για να εμφανίσουμε το PySpark DataFrame.

Σύνταξη:

spark_app.sql( 'ΔΗΜΙΟΥΡΓΙΑ ΠΡΟΣΩΡΙΝΗΣ ΠΡΟΒΟΛΗ VIEW_NAME ΜΕ ΧΡΗΣΗ ΕΠΙΛΟΓΩΝ json (διαδρομή 'file_name.json')' )

Εδώ, το 'VIEW_NAME' είναι η προβολή των δεδομένων JSON και το 'όνομα_αρχείου' είναι το όνομα του αρχείου JSON.

Παράδειγμα 1:

Εξετάστε το αρχείο JSON που χρησιμοποιείται στα προηγούμενα παραδείγματα - 'candidate_skills.json'. Επιλέξτε όλες τις σειρές από το DataFrame χρησιμοποιώντας SELECT με τον τελεστή '*'. Εδώ, το * επιλέγει όλες τις στήλες από το PySpark DataFrame.

εισαγωγή pyspark

εισάγετε πάντα

από το pyspark.sql εισαγωγή SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

# Χρησιμοποιώντας το spark.sql για τη δημιουργία VIEW από το JSON

kandidat_skills = linuxhint_spark_app.sql( 'ΔΗΜΙΟΥΡΓΙΑ ΠΡΟΣΩΡΙΝΗΣ ΠΡΟΒΟΛΗΣ Candidate_data ΜΕ ΧΡΗΣΗ ΕΠΙΛΟΓΩΝ json (διαδρομή 'candidate_skills.json')' )

# Χρησιμοποιήστε το ερώτημα SELECT για να επιλέξετε όλες τις εγγραφές από το Candidate_data.

linuxhint_spark_app.sql( 'SELECT * from Candidate_data' ).προβολή()

Παραγωγή:

Ο συνολικός αριθμός εγγραφών στο PySpark DataFrame (διαβάστηκε από JSON) είναι 3.

Παράδειγμα 2:

Τώρα, φιλτράρετε τις εγγραφές στο PySpark DataFrame με βάση τη στήλη ηλικία. Χρησιμοποιήστε τον τελεστή 'μεγαλύτερο από' στην ηλικία για να λάβετε τις σειρές με ηλικία μεγαλύτερη από 22.

# Χρησιμοποιήστε το ερώτημα SELECT για να επιλέξετε εγγραφές με ηλικία > 22 ετών.

linuxhint_spark_app.sql( 'SELECT * from Candidate_data όπου ηλικία>22' ).προβολή()

Παραγωγή:

Υπάρχει μόνο μία εγγραφή στο PySpark DataFrame με ηλικία μεγαλύτερη από 22.

συμπέρασμα

Μάθαμε τους τρεις διαφορετικούς τρόπους ανάγνωσης του JSON στο PySpark DataFrame. Αρχικά, μάθαμε πώς να χρησιμοποιούμε τη μέθοδο read_json() που είναι διαθέσιμη στη λειτουργική μονάδα Pandas για την ανάγνωση JSON στο PySpark DataFrame. Στη συνέχεια, μάθαμε πώς να διαβάζουμε τα αρχεία JSON μίας/πολλαπλής γραμμής χρησιμοποιώντας τη μέθοδο spark.read.json() με option(). Για να διαβάσουμε πολλά αρχεία JSON ταυτόχρονα, πρέπει να περάσουμε μια λίστα ονομάτων αρχείων σε αυτήν τη μέθοδο. Χρησιμοποιώντας το PySpark SQL, το αρχείο JSON διαβάζεται στην προσωρινή προβολή και το DataFrame εμφανίζεται χρησιμοποιώντας το ερώτημα SELECT.