Σε αυτόν τον οδηγό, θα επικεντρωθούμε κυρίως στην ανάγνωση/φόρτωση του αρχείου παρκέ στο PySpark DataFrame/SQL χρησιμοποιώντας τη συνάρτηση read.parquet() που είναι διαθέσιμη στην κλάση pyspark.sql.DataFrameReader.
Θέμα Περιεχομένων:
Διαβάστε το αρχείο Parquet στο PySpark DataFrame
Διαβάστε το αρχείο Parquet στο PySpark SQL
Pyspark.sql.DataFrameReader.parquet()
Αυτή η συνάρτηση χρησιμοποιείται για την ανάγνωση του αρχείου παρκέ και τη φόρτωσή του στο PySpark DataFrame. Παίρνει τη διαδρομή/όνομα αρχείου του αρχείου παρκέ. Μπορούμε απλά να χρησιμοποιήσουμε τη συνάρτηση read.parquet() αφού αυτή είναι η γενική συνάρτηση.
Σύνταξη:
Ας δούμε τη σύνταξη του read.parquet():
spark_app.read.parquet(file_name.parquet/path)Αρχικά, εγκαταστήστε τη λειτουργική μονάδα PySpark χρησιμοποιώντας την εντολή pip:
pip εγκατάσταση pyspark
Αποκτήστε το αρχείο παρκέ
Για να διαβάσετε ένα αρχείο παρκέ, χρειάζεστε τα δεδομένα στα οποία δημιουργείται το αρχείο παρκέ από αυτά τα δεδομένα. Σε αυτό το μέρος, θα δούμε πώς να δημιουργήσετε ένα αρχείο παρκέ από το PySpark DataFrame.
Ας δημιουργήσουμε ένα PySpark DataFrame με 5 εγγραφές και ας το γράψουμε στο αρχείο παρκέ 'industry_parquet'.
εισαγωγή pysparkαπό το pyspark.sql εισαγωγή SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()
# δημιουργήστε το πλαίσιο δεδομένων που αποθηκεύει τις λεπτομέρειες του κλάδου
industrial_df = linuxhint_spark_app.createDataFrame([Σειρά(Τύπος= 'Γεωργία' ,Περιοχή= 'ΗΠΑ' ,
Βαθμολογία= 'Ζεστό' ,Σύνολο_εργαζομένων= 100 ),
Σειρά(Τύπος= 'Γεωργία' ,Περιοχή= 'Ινδία' ,Βαθμολογία= 'Ζεστό' ,Σύνολο_εργαζομένων= 200 ),
Σειρά(Τύπος= 'Ανάπτυξη' ,Περιοχή= 'ΗΠΑ' ,Βαθμολογία= 'Ζεστός' ,Σύνολο_εργαζομένων= 100 ),
Σειρά(Τύπος= 'Εκπαίδευση' ,Περιοχή= 'ΗΠΑ' ,Βαθμολογία= 'Δροσερός' ,Σύνολο_εργαζομένων= 400 ),
Σειρά(Τύπος= 'Εκπαίδευση' ,Περιοχή= 'ΗΠΑ' ,Βαθμολογία= 'Ζεστός' ,Σύνολο_εργαζομένων= είκοσι )
])
# Πραγματικό DataFrame
industrial_df.show()
# Γράψτε το industrial_df στο αρχείο παρκέ
industrial_df.coalesce( 1 ).γράψτε.παρκέ( 'industry_parquet' )
Παραγωγή:
Αυτό είναι το DataFrame που περιέχει 5 εγγραφές.
Δημιουργείται ένα αρχείο παρκέ για το προηγούμενο DataFrame. Εδώ, το όνομα του αρχείου μας με επέκταση είναι 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet'. Χρησιμοποιούμε αυτό το αρχείο σε ολόκληρο το σεμινάριο.
Διαβάστε το αρχείο Parquet στο PySpark DataFrame
Έχουμε τη λίμα παρκέ. Ας διαβάσουμε αυτό το αρχείο χρησιμοποιώντας τη συνάρτηση read.parquet() και ας το φορτώσουμε στο PySpark DataFrame.
εισαγωγή pysparkαπό το pyspark.sql εισαγωγή SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()
# Διαβάστε το αρχείο παρκέ στο αντικείμενο dataframe_from_parquet.
dataframe_from_parquet=linuzhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# Εμφάνιση του dataframe_from_parquet-DataFrame
dataframe_from_parquet.show()
Παραγωγή:
Εμφανίζουμε το DataFrame χρησιμοποιώντας τη μέθοδο show() που δημιουργήθηκε από το αρχείο παρκέ.
Ερωτήματα SQL με αρχείο Parquet
Μετά τη φόρτωση στο DataFrame, μπορεί να είναι δυνατή η δημιουργία των πινάκων SQL και η εμφάνιση των δεδομένων που υπάρχουν στο DataFrame. Πρέπει να δημιουργήσουμε μια ΠΡΟΣΩΡΙΝΗ ΠΡΟΒΟΛΗ και να χρησιμοποιήσουμε τις εντολές SQL για να επιστρέψουμε τις εγγραφές από το DataFrame που δημιουργείται από το αρχείο παρκέ.
Παράδειγμα 1:
Δημιουργήστε μια προσωρινή προβολή με το όνομα 'Sectors' και χρησιμοποιήστε την εντολή SELECT για να εμφανίσετε τις εγγραφές στο DataFrame. Μπορείτε να ανατρέξετε σε αυτό φροντιστήριο που εξηγεί πώς να δημιουργήσετε ένα VIEW στο Spark – SQL.
εισαγωγή pysparkαπό το pyspark.sql εισαγωγή SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()
# Διαβάστε το αρχείο παρκέ στο αντικείμενο dataframe_from_parquet.
dataframe_from_parquet=linuzhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# Δημιουργία προβολής από το παραπάνω αρχείο παρκέ με το όνομα - 'Sectors'
dataframe_from_parquet.createOrReplaceTempView( 'Τομείς' )
# Ερώτημα για εμφάνιση όλων των εγγραφών από τους Τομείς
linuxhint_spark_app.sql( 'επιλογή * από Τομείς' ).προβολή()
Παραγωγή:
Παράδειγμα 2:
Χρησιμοποιώντας την προηγούμενη VIEW, γράψτε το ερώτημα SQL:
- Για εμφάνιση όλων των εγγραφών από τους Τομείς που ανήκουν στην 'Ινδία'.
- Για να εμφανίσετε όλες τις εγγραφές από τους Τομείς με έναν υπάλληλο που είναι μεγαλύτερος από 100.
linuxhint_spark_app.sql( 'επιλέξτε * από Τομείς όπου Περιοχή='Ινδία'' ).προβολή()
# Ερώτημα για εμφάνιση όλων των εγγραφών από τους Τομείς με εργαζόμενο μεγαλύτερο από 100
linuxhint_spark_app.sql( 'επιλέξτε * από Τομείς όπου Total_employees>100' ).προβολή()
Παραγωγή:
Υπάρχει μόνο ένα ρεκόρ με περιοχή που είναι η «Ινδία» και δύο ρεκόρ με υπαλλήλους που είναι μεγαλύτεροι από 100.
Διαβάστε το αρχείο Parquet στο PySpark SQL
Αρχικά, πρέπει να δημιουργήσουμε ένα VIEW χρησιμοποιώντας την εντολή CREATE. Χρησιμοποιώντας τη λέξη-κλειδί «διαδρομή» στο ερώτημα SQL, μπορούμε να διαβάσουμε το αρχείο παρκέ στο Spark SQL. Μετά τη διαδρομή, πρέπει να καθορίσουμε το όνομα αρχείου/την τοποθεσία του αρχείου.
Σύνταξη:
spark_app.sql( 'ΔΗΜΙΟΥΡΓΙΑ ΠΡΟΣΩΡΙΝΗ ΠΡΟΒΟΛΗ view_name ΜΕ ΧΡΗΣΗ ΕΠΙΛΟΓΩΝ παρκέ (διαδρομή ' όνομα_αρχείου.παρκέ ')' )Παράδειγμα 1:
Δημιουργήστε μια προσωρινή προβολή με το όνομα 'Sector2' και διαβάστε το αρχείο παρκέ σε αυτήν. Χρησιμοποιώντας τη συνάρτηση sql(), γράψτε το ερώτημα επιλογής για να εμφανίσετε όλες τις εγγραφές που υπάρχουν στην προβολή.
εισαγωγή pysparkαπό το pyspark.sql εισαγωγή SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()
# Διαβάστε το αρχείο παρκέ στο Spark-SQL
linuxhint_spark_app.sql( 'ΔΗΜΙΟΥΡΓΙΑ ΠΡΟΣΩΡΙΝΗ ΠΡΟΒΟΛΗ Τομέα 2 ΜΕ ΧΡΗΣΗ ΕΠΙΛΟΓΩΝ παρκέ (διαδρομή ' part-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )
# Ερώτημα για εμφάνιση όλων των εγγραφών από το Sector2
linuxhint_spark_app.sql( 'επιλογή * από το Sector2' ).προβολή()
Παραγωγή:
Παράδειγμα 2:
Χρησιμοποιήστε την προηγούμενη ΠΡΟΒΟΛΗ και γράψτε το ερώτημα για να εμφανίσετε όλες τις εγγραφές με τη βαθμολογία 'Hot' ή 'Cool'.
# Ερώτημα για εμφάνιση όλων των εγγραφών από το Sector2 με Rating- Hot ή Cool.linuxhint_spark_app.sql( 'επιλέξτε * από τον Τομέα 2 όπου Βαθμολογία='Ζεστό' Ή Βαθμολογία='Δροσερό'' ).προβολή()
Παραγωγή:
Υπάρχουν τρεις δίσκοι με βαθμολογία 'Hot' ή 'Cool'.
συμπέρασμα
Στο PySpark, η συνάρτηση write.parquet() γράφει το DataFrame στο αρχείο παρκέ. Η συνάρτηση read.parquet() διαβάζει το αρχείο παρκέ στο PySpark DataFrame ή σε οποιοδήποτε άλλο DataSource. Μάθαμε πώς να διαβάζουμε το αρχείο παρκέ στο PySpark DataFrame και στον πίνακα PySpark. Ως μέρος αυτού του σεμιναρίου, συζητήσαμε επίσης πώς να δημιουργήσετε τους πίνακες από το PySpark DataFrame και να φιλτράρετε τα δεδομένα χρησιμοποιώντας τον όρο WHERE.