Θέμα Περιεχομένων:
Ανάγνωση JSON στο PySpark DataFrame χρησιμοποιώντας Pandas.read_json()
Ανάγνωση JSON σε PySpark DataFrame χρησιμοποιώντας Spark.read.json()
Ανάγνωση JSON σε PySpark DataFrame με χρήση του PySpark SQL
Σε αυτό το σεμινάριο, θα δούμε πώς να διαβάζουμε το JSON στο PySpark DataFrame χρησιμοποιώντας τα pandas.read_json(), spark.read.json() και spark.sql. Σε όλα τα σενάρια, θα εξετάσουμε τα διαφορετικά παραδείγματα λαμβάνοντας υπόψη τις διαφορετικές μορφές JSON.
Εγκαταστήστε τη βιβλιοθήκη PySpark πριν εφαρμόσετε τα ακόλουθα παραδείγματα.
pip εγκατάσταση pysparkΜετά την επιτυχή εγκατάσταση, μπορείτε να δείτε την έξοδο ως εξής:
Ανάγνωση JSON στο PySpark DataFrame χρησιμοποιώντας Pandas.read_json()
Στο PySpark, η μέθοδος createDataFrame() χρησιμοποιείται για την απευθείας δημιουργία του DataFrame. Εδώ, χρειάζεται απλώς να περάσουμε το αρχείο/διαδρομή JSON στο αρχείο JSON μέσω της μεθόδου pandas.read_json(). Αυτή η μέθοδος read_json() παίρνει το όνομα/διαδρομή αρχείου που είναι διαθέσιμο στη λειτουργική μονάδα Pandas. Αυτός είναι ο λόγος για τον οποίο είναι απαραίτητο να εισαγάγετε και να χρησιμοποιήσετε τη μονάδα Pandas.
Σύνταξη:
spark_app.createDataFrame(pandas.read_json( 'file_name.json' ))Παράδειγμα:
Ας δημιουργήσουμε ένα αρχείο JSON με το όνομα 'student_skill.json' που περιέχει 2 εγγραφές. Εδώ, τα κλειδιά/στήλες είναι 'Μαθητής 1' και 'Μαθητής 2'. Οι σειρές είναι το όνομα, η ηλικία, η ικανότητα1 και η ικανότητα2.
εισαγωγή pyspark
εισάγετε πάντα
από το pyspark.sql εισαγωγή SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()
# Χρήση pandas.read_json()
kandidat_skills = linuxhint_spark_app.createDataFrame(pandas.read_json( 'student_skill.json' ))
kandidat_skills.show()
Παραγωγή:
Μπορούμε να δούμε ότι τα δεδομένα JSON μετατρέπονται σε PySpark DataFrame με καθορισμένες στήλες και γραμμές.
2. Ανάγνωση JSON σε PySpark DataFrame χρησιμοποιώντας Spark.read.json()
Η read.json() είναι μια μέθοδος παρόμοια με την read_json() στα Pandas. Εδώ, η read.json() παίρνει μια διαδρομή προς το JSON ή απευθείας στο αρχείο JSON και το φορτώνει απευθείας στο PySpark DataFrame. Δεν χρειάζεται να χρησιμοποιήσετε τη μέθοδο createDataFrame() σε αυτό το σενάριο. Εάν θέλετε να διαβάζετε πολλά αρχεία JSON ταυτόχρονα, πρέπει να περάσουμε μια λίστα ονομάτων αρχείων JSON μέσω μιας λίστας που χωρίζεται με κόμμα. Όλες οι εγγραφές JSON αποθηκεύονται σε ένα DataFrame.
Σύνταξη:
Μεμονωμένο αρχείο - spark_app.read.json( 'file_name.json' )Πολλαπλά αρχεία - spark_app.read.json([ 'file1.json' , 'file2.json' ,...])
Σενάριο 1: Διαβάστε το JSON έχοντας μία γραμμή
Εάν το αρχείο JSON είναι σε μορφές record1, record2, record3… (μονής γραμμής), μπορούμε να το ονομάσουμε JSON με μεμονωμένες γραμμές. Το Spark επεξεργάζεται αυτές τις εγγραφές και τις αποθηκεύει στο PySpark DataFrame ως σειρές. Κάθε εγγραφή είναι μια σειρά στο PySpark DataFrame.
Ας δημιουργήσουμε ένα αρχείο JSON με το όνομα 'candidate_skills.json' που περιέχει 3 εγγραφές. Διαβάστε αυτό το JSON στο PySpark DataFrame.
εισαγωγή pyspark
από το pyspark.sql εισαγωγή SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()
# Διαβάστε το kandidat_skills.json στο PySpark DataFrame
kandidat_skills = linuxhint_spark_app.read.json( 'candidate_skills.json' )
kandidat_skills.show()
Παραγωγή:
Μπορούμε να δούμε ότι τα δεδομένα JSON μετατρέπονται σε PySpark DataFrame με καθορισμένες εγγραφές και ονόματα στηλών.
Σενάριο 2: Διαβάστε το JSON με πολλές γραμμές
Εάν το αρχείο JSON έχει πολλές γραμμές, πρέπει να χρησιμοποιήσετε τη μέθοδο read.option().json() για να μεταβιβάσετε την παράμετρο πολλαπλών γραμμών που πρέπει να οριστεί σε true. Αυτό μας επιτρέπει να φορτώσουμε JSON με πολλές γραμμές στο PySpark DataFrame.
read.option( 'πολυγραμμή' , 'αληθής' ).json( 'file_name.json' )Ας δημιουργήσουμε ένα αρχείο JSON με το όνομα 'multi.json' που περιέχει 3 εγγραφές. Διαβάστε αυτό το JSON στο PySpark DataFrame.
εισαγωγή pyspark
από το pyspark.sql εισαγωγή SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()
# Διαβάστε το multi.json (με πολλές γραμμές) στο PySpark DataFrame
kandidat_skills = linuxhint_spark_app.read.option( 'πολυγραμμή' , 'αληθής' ).json( 'multi.json' )
kandidat_skills.show()
Παραγωγή:
Σενάριο 3: Ανάγνωση πολλαπλών JSON
Έχουμε ήδη συζητήσει στην αρχική φάση αυτού του σεμιναρίου σχετικά με πολλά αρχεία JSON. Εάν θέλετε να διαβάσετε πολλά αρχεία JSON ταυτόχρονα και να τα αποθηκεύσετε σε ένα μόνο PySpark DataFrame, πρέπει να περάσουμε μια λίστα ονομάτων αρχείων στη μέθοδο read.json().
Ας δημιουργήσουμε δύο αρχεία JSON με τα ονόματα 'candidate_skills.json' και 'candidate_skills2.json' και ας τα φορτώσουμε στο PySpark DataFrame.
Το αρχείο 'candidate_skills.json' περιέχει τρεις εγγραφές.
Το αρχείο 'candidate_skill2.json' περιέχει μόνο μία εγγραφή.
εισαγωγή pyspark
από το pyspark.sql εισαγωγή SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()
# Ανάγνωση των αρχείων kandidat_skills και kandidat_skills2 κάθε φορά στο PySpark DataFrame
kandidat_skills = linuxhint_spark_app.read.json([ 'candidate_skills.json' , 'candidate_skills2.json' ])
kandidat_skills.show()
Παραγωγή:
Τέλος, το DataFrame κατέχει τέσσερις εγγραφές. Οι τρεις πρώτες εγγραφές ανήκουν στο πρώτο JSON και οι τελευταίες στο δεύτερο JSON.
Ανάγνωση JSON σε PySpark DataFrame χρησιμοποιώντας Spark.read.json()
Η read.json() είναι μια μέθοδος παρόμοια με την read_json() στα Pandas. Εδώ, το read.json() παίρνει μια διαδρομή προς το JSON ή απευθείας στο αρχείο JSON και το φορτώνει απευθείας στο PySpark DataFrame. Δεν χρειάζεται να χρησιμοποιήσετε τη μέθοδο createDataFrame() σε αυτό το σενάριο. Εάν θέλετε να διαβάζετε πολλά αρχεία JSON ταυτόχρονα, πρέπει να περάσουμε μια λίστα ονομάτων αρχείων JSON μέσω μιας λίστας που χωρίζεται με κόμμα. Όλες οι εγγραφές JSON αποθηκεύονται σε ένα DataFrame.
Σύνταξη:
Μεμονωμένο αρχείο - spark_app.read.json( 'file_name.json' )Πολλαπλά αρχεία - spark_app.read.json([ 'file1.json' , 'file2.json' ,...])
Σενάριο 1: Διαβάστε το JSON έχοντας μία γραμμή
Εάν το αρχείο JSON είναι σε μορφή record1, record2, record3… (μονής γραμμής), μπορούμε να το ονομάσουμε JSON με μεμονωμένες γραμμές. Το Spark επεξεργάζεται αυτές τις εγγραφές και τις αποθηκεύει στο PySpark DataFrame ως σειρές. Κάθε εγγραφή είναι μια σειρά στο PySpark DataFrame.
Ας δημιουργήσουμε ένα αρχείο JSON με το όνομα 'candidate_skills.json' που περιέχει 3 εγγραφές. Διαβάστε αυτό το JSON στο PySpark DataFrame.
εισαγωγή pyspark
από το pyspark.sql εισαγωγή SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()
# Διαβάστε το kandidat_skills.json στο PySpark DataFrame
kandidat_skills = linuxhint_spark_app.read.json( 'candidate_skills.json' )
kandidat_skills.show()
Παραγωγή:
Μπορούμε να δούμε ότι τα δεδομένα JSON μετατρέπονται σε PySpark DataFrame με καθορισμένες εγγραφές και ονόματα στηλών.
Ανάγνωση JSON σε PySpark DataFrame με χρήση του PySpark SQL
Μπορεί να είναι δυνατή η δημιουργία μιας προσωρινής προβολής των δεδομένων μας JSON χρησιμοποιώντας το PySpark SQL. Άμεσα, μπορούμε να παρέχουμε το JSON τη στιγμή της δημιουργίας της προσωρινής προβολής. Δείτε την παρακάτω σύνταξη. Μετά από αυτό, μπορούμε να χρησιμοποιήσουμε την εντολή SELECT για να εμφανίσουμε το PySpark DataFrame.
Σύνταξη:
spark_app.sql( 'ΔΗΜΙΟΥΡΓΙΑ ΠΡΟΣΩΡΙΝΗΣ ΠΡΟΒΟΛΗ VIEW_NAME ΜΕ ΧΡΗΣΗ ΕΠΙΛΟΓΩΝ json (διαδρομή 'file_name.json')' )Εδώ, το 'VIEW_NAME' είναι η προβολή των δεδομένων JSON και το 'όνομα_αρχείου' είναι το όνομα του αρχείου JSON.
Παράδειγμα 1:
Εξετάστε το αρχείο JSON που χρησιμοποιείται στα προηγούμενα παραδείγματα - 'candidate_skills.json'. Επιλέξτε όλες τις σειρές από το DataFrame χρησιμοποιώντας SELECT με τον τελεστή '*'. Εδώ, το * επιλέγει όλες τις στήλες από το PySpark DataFrame.
εισαγωγή pysparkεισάγετε πάντα
από το pyspark.sql εισαγωγή SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()
# Χρησιμοποιώντας το spark.sql για τη δημιουργία VIEW από το JSON
kandidat_skills = linuxhint_spark_app.sql( 'ΔΗΜΙΟΥΡΓΙΑ ΠΡΟΣΩΡΙΝΗΣ ΠΡΟΒΟΛΗΣ Candidate_data ΜΕ ΧΡΗΣΗ ΕΠΙΛΟΓΩΝ json (διαδρομή 'candidate_skills.json')' )
# Χρησιμοποιήστε το ερώτημα SELECT για να επιλέξετε όλες τις εγγραφές από το Candidate_data.
linuxhint_spark_app.sql( 'SELECT * from Candidate_data' ).προβολή()
Παραγωγή:
Ο συνολικός αριθμός εγγραφών στο PySpark DataFrame (διαβάστηκε από JSON) είναι 3.
Παράδειγμα 2:
Τώρα, φιλτράρετε τις εγγραφές στο PySpark DataFrame με βάση τη στήλη ηλικία. Χρησιμοποιήστε τον τελεστή 'μεγαλύτερο από' στην ηλικία για να λάβετε τις σειρές με ηλικία μεγαλύτερη από 22.
# Χρησιμοποιήστε το ερώτημα SELECT για να επιλέξετε εγγραφές με ηλικία > 22 ετών.linuxhint_spark_app.sql( 'SELECT * from Candidate_data όπου ηλικία>22' ).προβολή()
Παραγωγή:
Υπάρχει μόνο μία εγγραφή στο PySpark DataFrame με ηλικία μεγαλύτερη από 22.
συμπέρασμα
Μάθαμε τους τρεις διαφορετικούς τρόπους ανάγνωσης του JSON στο PySpark DataFrame. Αρχικά, μάθαμε πώς να χρησιμοποιούμε τη μέθοδο read_json() που είναι διαθέσιμη στη λειτουργική μονάδα Pandas για την ανάγνωση JSON στο PySpark DataFrame. Στη συνέχεια, μάθαμε πώς να διαβάζουμε τα αρχεία JSON μίας/πολλαπλής γραμμής χρησιμοποιώντας τη μέθοδο spark.read.json() με option(). Για να διαβάσουμε πολλά αρχεία JSON ταυτόχρονα, πρέπει να περάσουμε μια λίστα ονομάτων αρχείων σε αυτήν τη μέθοδο. Χρησιμοποιώντας το PySpark SQL, το αρχείο JSON διαβάζεται στην προσωρινή προβολή και το DataFrame εμφανίζεται χρησιμοποιώντας το ερώτημα SELECT.