PySpark Read.Parquet()

Pyspark Read Parquet



Στο PySpark, η συνάρτηση write.parquet() γράφει το DataFrame στο αρχείο παρκέ και η read.parquet() διαβάζει το αρχείο παρκέ στο PySpark DataFrame ή σε οποιοδήποτε άλλο DataSource. Για να επεξεργαστούμε τις στήλες στο Apache Spark γρήγορα και αποτελεσματικά, πρέπει να συμπιέσουμε τα δεδομένα. Η συμπίεση δεδομένων εξοικονομεί τη μνήμη μας και όλες οι στήλες μετατρέπονται σε επίπεδο επίπεδο. Αυτό σημαίνει ότι υπάρχει χώρος αποθήκευσης σε επίπεδο στήλης. Το αρχείο που τα αποθηκεύει είναι γνωστό ως αρχείο PARQUET.

Σε αυτόν τον οδηγό, θα επικεντρωθούμε κυρίως στην ανάγνωση/φόρτωση του αρχείου παρκέ στο PySpark DataFrame/SQL χρησιμοποιώντας τη συνάρτηση read.parquet() που είναι διαθέσιμη στην κλάση pyspark.sql.DataFrameReader.

Θέμα Περιεχομένων:







Αποκτήστε το αρχείο παρκέ



Διαβάστε το αρχείο Parquet στο PySpark DataFrame



Διαβάστε το αρχείο Parquet στο PySpark SQL





Pyspark.sql.DataFrameReader.parquet()

Αυτή η συνάρτηση χρησιμοποιείται για την ανάγνωση του αρχείου παρκέ και τη φόρτωσή του στο PySpark DataFrame. Παίρνει τη διαδρομή/όνομα αρχείου του αρχείου παρκέ. Μπορούμε απλά να χρησιμοποιήσουμε τη συνάρτηση read.parquet() αφού αυτή είναι η γενική συνάρτηση.

Σύνταξη:



Ας δούμε τη σύνταξη του read.parquet():

spark_app.read.parquet(file_name.parquet/path)

Αρχικά, εγκαταστήστε τη λειτουργική μονάδα PySpark χρησιμοποιώντας την εντολή pip:

pip εγκατάσταση pyspark

Αποκτήστε το αρχείο παρκέ

Για να διαβάσετε ένα αρχείο παρκέ, χρειάζεστε τα δεδομένα στα οποία δημιουργείται το αρχείο παρκέ από αυτά τα δεδομένα. Σε αυτό το μέρος, θα δούμε πώς να δημιουργήσετε ένα αρχείο παρκέ από το PySpark DataFrame.

Ας δημιουργήσουμε ένα PySpark DataFrame με 5 εγγραφές και ας το γράψουμε στο αρχείο παρκέ 'industry_parquet'.

εισαγωγή pyspark

από το pyspark.sql εισαγωγή SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

# δημιουργήστε το πλαίσιο δεδομένων που αποθηκεύει τις λεπτομέρειες του κλάδου

industrial_df = linuxhint_spark_app.createDataFrame([Σειρά(Τύπος= 'Γεωργία' ,Περιοχή= 'ΗΠΑ' ,
Βαθμολογία= 'Ζεστό' ,Σύνολο_εργαζομένων= 100 ),

Σειρά(Τύπος= 'Γεωργία' ,Περιοχή= 'Ινδία' ,Βαθμολογία= 'Ζεστό' ,Σύνολο_εργαζομένων= 200 ),

Σειρά(Τύπος= 'Ανάπτυξη' ,Περιοχή= 'ΗΠΑ' ,Βαθμολογία= 'Ζεστός' ,Σύνολο_εργαζομένων= 100 ),

Σειρά(Τύπος= 'Εκπαίδευση' ,Περιοχή= 'ΗΠΑ' ,Βαθμολογία= 'Δροσερός' ,Σύνολο_εργαζομένων= 400 ),

Σειρά(Τύπος= 'Εκπαίδευση' ,Περιοχή= 'ΗΠΑ' ,Βαθμολογία= 'Ζεστός' ,Σύνολο_εργαζομένων= είκοσι )

])

# Πραγματικό DataFrame

industrial_df.show()

# Γράψτε το industrial_df στο αρχείο παρκέ

industrial_df.coalesce( 1 ).γράψτε.παρκέ( 'industry_parquet' )

Παραγωγή:

Αυτό είναι το DataFrame που περιέχει 5 εγγραφές.

Δημιουργείται ένα αρχείο παρκέ για το προηγούμενο DataFrame. Εδώ, το όνομα του αρχείου μας με επέκταση είναι 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet'. Χρησιμοποιούμε αυτό το αρχείο σε ολόκληρο το σεμινάριο.

Διαβάστε το αρχείο Parquet στο PySpark DataFrame

Έχουμε τη λίμα παρκέ. Ας διαβάσουμε αυτό το αρχείο χρησιμοποιώντας τη συνάρτηση read.parquet() και ας το φορτώσουμε στο PySpark DataFrame.

εισαγωγή pyspark

από το pyspark.sql εισαγωγή SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

# Διαβάστε το αρχείο παρκέ στο αντικείμενο dataframe_from_parquet.

dataframe_from_parquet=linuzhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Εμφάνιση του dataframe_from_parquet-DataFrame

dataframe_from_parquet.show()

Παραγωγή:

Εμφανίζουμε το DataFrame χρησιμοποιώντας τη μέθοδο show() που δημιουργήθηκε από το αρχείο παρκέ.

Ερωτήματα SQL με αρχείο Parquet

Μετά τη φόρτωση στο DataFrame, μπορεί να είναι δυνατή η δημιουργία των πινάκων SQL και η εμφάνιση των δεδομένων που υπάρχουν στο DataFrame. Πρέπει να δημιουργήσουμε μια ΠΡΟΣΩΡΙΝΗ ΠΡΟΒΟΛΗ και να χρησιμοποιήσουμε τις εντολές SQL για να επιστρέψουμε τις εγγραφές από το DataFrame που δημιουργείται από το αρχείο παρκέ.

Παράδειγμα 1:

Δημιουργήστε μια προσωρινή προβολή με το όνομα 'Sectors' και χρησιμοποιήστε την εντολή SELECT για να εμφανίσετε τις εγγραφές στο DataFrame. Μπορείτε να ανατρέξετε σε αυτό φροντιστήριο που εξηγεί πώς να δημιουργήσετε ένα VIEW στο Spark – SQL.

εισαγωγή pyspark

από το pyspark.sql εισαγωγή SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

# Διαβάστε το αρχείο παρκέ στο αντικείμενο dataframe_from_parquet.

dataframe_from_parquet=linuzhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Δημιουργία προβολής από το παραπάνω αρχείο παρκέ με το όνομα - 'Sectors'

dataframe_from_parquet.createOrReplaceTempView( 'Τομείς' )

# Ερώτημα για εμφάνιση όλων των εγγραφών από τους Τομείς

linuxhint_spark_app.sql( 'επιλογή * από Τομείς' ).προβολή()

Παραγωγή:

Παράδειγμα 2:

Χρησιμοποιώντας την προηγούμενη VIEW, γράψτε το ερώτημα SQL:

  1. Για εμφάνιση όλων των εγγραφών από τους Τομείς που ανήκουν στην 'Ινδία'.
  2. Για να εμφανίσετε όλες τις εγγραφές από τους Τομείς με έναν υπάλληλο που είναι μεγαλύτερος από 100.
# Ερώτημα για εμφάνιση όλων των εγγραφών από τους Τομείς που ανήκουν στην 'Ινδία'.

linuxhint_spark_app.sql( 'επιλέξτε * από Τομείς όπου Περιοχή='Ινδία'' ).προβολή()

# Ερώτημα για εμφάνιση όλων των εγγραφών από τους Τομείς με εργαζόμενο μεγαλύτερο από 100

linuxhint_spark_app.sql( 'επιλέξτε * από Τομείς όπου Total_employees>100' ).προβολή()

Παραγωγή:

Υπάρχει μόνο ένα ρεκόρ με περιοχή που είναι η «Ινδία» και δύο ρεκόρ με υπαλλήλους που είναι μεγαλύτεροι από 100.

Διαβάστε το αρχείο Parquet στο PySpark SQL

Αρχικά, πρέπει να δημιουργήσουμε ένα VIEW χρησιμοποιώντας την εντολή CREATE. Χρησιμοποιώντας τη λέξη-κλειδί «διαδρομή» στο ερώτημα SQL, μπορούμε να διαβάσουμε το αρχείο παρκέ στο Spark SQL. Μετά τη διαδρομή, πρέπει να καθορίσουμε το όνομα αρχείου/την τοποθεσία του αρχείου.

Σύνταξη:

spark_app.sql( 'ΔΗΜΙΟΥΡΓΙΑ ΠΡΟΣΩΡΙΝΗ ΠΡΟΒΟΛΗ view_name ΜΕ ΧΡΗΣΗ ΕΠΙΛΟΓΩΝ παρκέ (διαδρομή ' όνομα_αρχείου.παρκέ ')' )

Παράδειγμα 1:

Δημιουργήστε μια προσωρινή προβολή με το όνομα 'Sector2' και διαβάστε το αρχείο παρκέ σε αυτήν. Χρησιμοποιώντας τη συνάρτηση sql(), γράψτε το ερώτημα επιλογής για να εμφανίσετε όλες τις εγγραφές που υπάρχουν στην προβολή.

εισαγωγή pyspark

από το pyspark.sql εισαγωγή SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

# Διαβάστε το αρχείο παρκέ στο Spark-SQL

linuxhint_spark_app.sql( 'ΔΗΜΙΟΥΡΓΙΑ ΠΡΟΣΩΡΙΝΗ ΠΡΟΒΟΛΗ Τομέα 2 ΜΕ ΧΡΗΣΗ ΕΠΙΛΟΓΩΝ παρκέ (διαδρομή ' part-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )

# Ερώτημα για εμφάνιση όλων των εγγραφών από το Sector2

linuxhint_spark_app.sql( 'επιλογή * από το Sector2' ).προβολή()

Παραγωγή:

Παράδειγμα 2:

Χρησιμοποιήστε την προηγούμενη ΠΡΟΒΟΛΗ και γράψτε το ερώτημα για να εμφανίσετε όλες τις εγγραφές με τη βαθμολογία 'Hot' ή 'Cool'.

# Ερώτημα για εμφάνιση όλων των εγγραφών από το Sector2 με Rating- Hot ή Cool.

linuxhint_spark_app.sql( 'επιλέξτε * από τον Τομέα 2 όπου Βαθμολογία='Ζεστό' Ή Βαθμολογία='Δροσερό'' ).προβολή()

Παραγωγή:

Υπάρχουν τρεις δίσκοι με βαθμολογία 'Hot' ή 'Cool'.

συμπέρασμα

Στο PySpark, η συνάρτηση write.parquet() γράφει το DataFrame στο αρχείο παρκέ. Η συνάρτηση read.parquet() διαβάζει το αρχείο παρκέ στο PySpark DataFrame ή σε οποιοδήποτε άλλο DataSource. Μάθαμε πώς να διαβάζουμε το αρχείο παρκέ στο PySpark DataFrame και στον πίνακα PySpark. Ως μέρος αυτού του σεμιναρίου, συζητήσαμε επίσης πώς να δημιουργήσετε τους πίνακες από το PySpark DataFrame και να φιλτράρετε τα δεδομένα χρησιμοποιώντας τον όρο WHERE.