PySpark Pandas_Udf()

Pyspark Pandas Udf



Ο μετασχηματισμός του PySpark DataFrame είναι δυνατός χρησιμοποιώντας τη συνάρτηση pandas_udf(). Είναι μια συνάρτηση καθορισμένη από το χρήστη που εφαρμόζεται στο PySpark DataFrame με βέλος. Μπορούμε να εκτελέσουμε τις διανυσματικές πράξεις χρησιμοποιώντας την pandas_udf(). Μπορεί να υλοποιηθεί περνώντας αυτή τη λειτουργία ως διακοσμητής. Ας βουτήξουμε σε αυτόν τον οδηγό για να γνωρίζουμε τη σύνταξη, τις παραμέτρους και διάφορα παραδείγματα.

Θέμα Περιεχομένων:

Εάν θέλετε να μάθετε για την εγκατάσταση του PySpark DataFrame και της μονάδας, προχωρήστε σε αυτό άρθρο .







Pyspark.sql.functions.pandas_udf()

Το pandas_udf () είναι διαθέσιμο στη μονάδα sql.functions στο PySpark, η οποία μπορεί να εισαχθεί χρησιμοποιώντας τη λέξη-κλειδί 'από'. Χρησιμοποιείται για την εκτέλεση των διανυσματικών λειτουργιών στο PySpark DataFrame μας. Αυτή η λειτουργία υλοποιείται σαν διακοσμητής περνώντας τρεις παραμέτρους. Μετά από αυτό, μπορούμε να δημιουργήσουμε μια συνάρτηση που ορίζεται από το χρήστη που επιστρέφει τα δεδομένα σε διανυσματική μορφή (όπως χρησιμοποιούμε τη σειρά/NumPy για αυτό) χρησιμοποιώντας ένα βέλος. Μέσα σε αυτή τη συνάρτηση, μπορούμε να επιστρέψουμε το αποτέλεσμα.



Δομή & Σύνταξη:



Αρχικά, ας δούμε τη δομή και τη σύνταξη αυτής της συνάρτησης:

@pandas_udf(τύπος δεδομένων)
def function_name(operation) -> convert_format:
δήλωση επιστροφής

Εδώ, το όνομα_λειτουργίας είναι το όνομα της καθορισμένης συνάρτησής μας. Ο τύπος δεδομένων καθορίζει τον τύπο δεδομένων που επιστρέφεται από αυτήν τη συνάρτηση. Μπορούμε να επιστρέψουμε το αποτέλεσμα χρησιμοποιώντας τη λέξη-κλειδί 'επιστροφή'. Όλες οι λειτουργίες εκτελούνται μέσα στη συνάρτηση με την εκχώρηση βέλους.





Pandas_udf (Function and ReturnType)

  1. Η πρώτη παράμετρος είναι η συνάρτηση που ορίζει ο χρήστης που της μεταβιβάζεται.
  2. Η δεύτερη παράμετρος χρησιμοποιείται για τον καθορισμό του τύπου δεδομένων επιστροφής από τη συνάρτηση.

Δεδομένα:

Σε ολόκληρο αυτόν τον οδηγό, χρησιμοποιούμε μόνο ένα PySpark DataFrame για επίδειξη. Όλες οι συναρτήσεις που ορίζονται από το χρήστη που ορίζουμε εφαρμόζονται σε αυτό το PySpark DataFrame. Βεβαιωθείτε ότι έχετε δημιουργήσει αυτό το DataFrame στο περιβάλλον σας πρώτα μετά την εγκατάσταση του PySpark.



εισαγωγή pyspark

από το pyspark.sql εισαγωγή SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Συμβουλή Linux' .getOrCreate()

από το pyspark.sql.functions εισαγωγή pandas_udf

από το pyspark.sql.types εισαγωγή *

εισάγετε τα πάντα ως πάντα

# λεπτομέρειες λαχανικών

λαχανικά =[{ 'τύπος' : 'λαχανικό' , 'όνομα' : 'ντομάτα' , 'locate_country' : 'ΗΠΑ' , 'ποσότητα' : 800 },

{ 'τύπος' : 'καρπός' , 'όνομα' : 'μπανάνα' , 'locate_country' : 'ΚΙΝΑ' , 'ποσότητα' : είκοσι },

{ 'τύπος' : 'λαχανικό' , 'όνομα' : 'ντομάτα' , 'locate_country' : 'ΗΠΑ' , 'ποσότητα' : 800 },

{ 'τύπος' : 'λαχανικό' , 'όνομα' : 'Μάνγκο' , 'locate_country' : 'ΙΑΠΩΝΙΑ' , 'ποσότητα' : 0 },

{ 'τύπος' : 'καρπός' , 'όνομα' : 'λεμόνι' , 'locate_country' : 'ΙΝΔΙΑ' , 'ποσότητα' : 1700 },

{ 'τύπος' : 'λαχανικό' , 'όνομα' : 'ντομάτα' , 'locate_country' : 'ΗΠΑ' , 'ποσότητα' : 1200 },

{ 'τύπος' : 'λαχανικό' , 'όνομα' : 'Μάνγκο' , 'locate_country' : 'ΙΑΠΩΝΙΑ' , 'ποσότητα' : 0 },

{ 'τύπος' : 'καρπός' , 'όνομα' : 'λεμόνι' , 'locate_country' : 'ΙΝΔΙΑ' , 'ποσότητα' : 0 }

]

# δημιουργήστε το πλαίσιο δεδομένων αγοράς από τα παραπάνω δεδομένα

market_df = linuxhint_spark_app.createDataFrame(λαχανικά)

market_df.show()

Παραγωγή:

Εδώ, δημιουργούμε αυτό το DataFrame με 4 στήλες και 8 σειρές. Τώρα, χρησιμοποιούμε την pandas_udf() για να δημιουργήσουμε τις συναρτήσεις που ορίζονται από το χρήστη και να τις εφαρμόσουμε σε αυτές τις στήλες.

Pandas_udf() με διαφορετικούς τύπους δεδομένων

Σε αυτό το σενάριο, δημιουργούμε ορισμένες συναρτήσεις που καθορίζονται από το χρήστη με το pandas_udf() και τις εφαρμόζουμε σε στήλες και εμφανίζουμε τα αποτελέσματα χρησιμοποιώντας τη μέθοδο select(). Σε κάθε περίπτωση, χρησιμοποιούμε τα pandas.Series καθώς εκτελούμε τις διανυσματικές πράξεις. Αυτό θεωρεί τις τιμές της στήλης ως μονοδιάστατο πίνακα και η πράξη εφαρμόζεται στη στήλη. Στον ίδιο τον διακοσμητή, καθορίζουμε τον τύπο επιστροφής συνάρτησης.

Παράδειγμα 1: Pandas_udf() με Τύπο συμβολοσειράς

Εδώ, δημιουργούμε δύο συναρτήσεις που καθορίζονται από το χρήστη με τον τύπο επιστροφής συμβολοσειράς για να μετατρέψουμε τις τιμές στήλης τύπου συμβολοσειράς σε κεφαλαία και πεζά. Τέλος, εφαρμόζουμε αυτές τις συναρτήσεις στις στήλες «type» και «locate_country».

# Μετατρέψτε τη στήλη τύπου σε κεφαλαία με το pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

επιστροφή i.str.upper()

# Μετατρέψτε τη στήλη locate_country σε πεζά με το pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

επιστροφή i.str.lower()

# Εμφάνιση των στηλών χρησιμοποιώντας select()

market_df.select( 'τύπος' ,type_upper_case( 'τύπος' ), 'locate_country' ,
χώρα_πεζά_κεφαλαία( 'locate_country' )).προβολή()

Παραγωγή:

Εξήγηση:

Η συνάρτηση StringType() είναι διαθέσιμη στη λειτουργική μονάδα pyspark.sql.types. Έχουμε ήδη εισαγάγει αυτήν την ενότητα κατά τη δημιουργία του PySpark DataFrame.

  1. Πρώτον, το UDF (συνάρτηση που ορίζεται από το χρήστη) επιστρέφει τις συμβολοσειρές με κεφαλαία χρησιμοποιώντας τη συνάρτηση str.upper(). Η str.upper() είναι διαθέσιμη στη δομή δεδομένων σειράς (καθώς μετατρέπουμε σε σειρά με ένα βέλος μέσα στη συνάρτηση) η οποία μετατρέπει τη δεδομένη συμβολοσειρά σε κεφαλαία. Τέλος, αυτή η συνάρτηση εφαρμόζεται στη στήλη 'type' που καθορίζεται στη μέθοδο select(). Προηγουμένως, όλες οι συμβολοσειρές στη στήλη τύπου είναι με πεζά. Τώρα, έχουν αλλάξει σε κεφαλαία.
  2. Δεύτερον, το UDF επιστρέφει τις συμβολοσειρές με κεφαλαία χρησιμοποιώντας τη συνάρτηση str.lower(). Η str.lower() είναι διαθέσιμη στη δομή δεδομένων σειράς που μετατρέπει τη δεδομένη συμβολοσειρά σε πεζά. Τέλος, αυτή η συνάρτηση εφαρμόζεται στη στήλη 'type' που καθορίζεται στη μέθοδο select(). Προηγουμένως, όλες οι συμβολοσειρές στη στήλη τύπου είναι με κεφαλαία. Τώρα, έχουν αλλάξει σε πεζά.

Παράδειγμα 2: Pandas_udf() με ακέραιο τύπο

Ας δημιουργήσουμε ένα UDF που μετατρέπει την ακέραια στήλη PySpark DataFrame στη σειρά Pandas και προσθέτουμε 100 σε κάθε τιμή. Περάστε τη στήλη 'ποσότητα' σε αυτήν τη συνάρτηση μέσα στη μέθοδο select().

# Προσθέστε 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

επιστροφή i+ 100

# Περάστε τη στήλη ποσότητας στην παραπάνω συνάρτηση και εμφανίστε.

market_df.select( 'ποσότητα' ,add_100( 'ποσότητα' )).προβολή()

Παραγωγή:

Εξήγηση:

Μέσα στο UDF, επαναλαμβάνουμε όλες τις τιμές και τις μετατρέπουμε σε Σειρά. Μετά από αυτό, προσθέτουμε 100 σε κάθε τιμή στη σειρά. Τέλος, περνάμε τη στήλη «ποσότητα» σε αυτή τη συνάρτηση και βλέπουμε ότι το 100 προστίθεται σε όλες τις τιμές.

Pandas_udf() με διαφορετικούς τύπους δεδομένων χρησιμοποιώντας Groupby() & Agg()

Ας δούμε τα παραδείγματα για να περάσουμε το UDF στις συγκεντρωτικές στήλες. Εδώ, οι τιμές στηλών ομαδοποιούνται πρώτα χρησιμοποιώντας τη συνάρτηση groupby() και η συγκέντρωση γίνεται χρησιμοποιώντας τη συνάρτηση agg(). Περνάμε το UDF μας μέσα σε αυτήν τη αθροιστική συνάρτηση.

Σύνταξη:

pyspark_dataframe_object.groupby( 'ομαδοποίηση_στήλης' ).agg(UDF
(pyspark_dataframe_object[ 'στήλη' ]))

Εδώ, οι τιμές στη στήλη ομαδοποίησης ομαδοποιούνται πρώτα. Στη συνέχεια, η συγκέντρωση γίνεται σε κάθε ομαδοποιημένο στοιχείο σε σχέση με το UDF μας.

Παράδειγμα 1: Pandas_udf() με Αθροιστική Μέση()

Εδώ, δημιουργούμε μια συνάρτηση που ορίζεται από το χρήστη με έναν τύπο επιστροφής float. Μέσα στη συνάρτηση, υπολογίζουμε τον μέσο όρο χρησιμοποιώντας τη συνάρτηση mean(). Αυτό το UDF μεταβιβάζεται στη στήλη 'ποσότητα' για να ληφθεί η μέση ποσότητα για κάθε τύπο.

# επιστρέψτε τον μέσο όρο/μέσο

@pandas_udf( 'φλοτέρ' )

def average_function(i: panda.Series) -> float:

επιστροφή i.mean()

# Περάστε τη στήλη ποσότητας στη συνάρτηση ομαδοποιώντας τη στήλη τύπου.

market_df.groupby( 'τύπος' ).agg(average_function(market_df[ 'ποσότητα' ])).προβολή()

Παραγωγή:

Ομαδοποιούμε με βάση στοιχεία στη στήλη 'τύπος'. Δημιουργούνται δύο ομάδες - 'φρούτα' και 'λαχανικά'. Για κάθε ομάδα, ο μέσος όρος υπολογίζεται και επιστρέφεται.

Παράδειγμα 2: Pandas_udf() με Aggregate Max() και Min()

Εδώ, δημιουργούμε δύο συναρτήσεις που καθορίζονται από το χρήστη με τον τύπο επιστροφής ακέραιου αριθμού (int). Το πρώτο UDF επιστρέφει την ελάχιστη τιμή και το δεύτερο UDF τη μέγιστη τιμή.

# pandas_udf που επιστρέφουν την ελάχιστη τιμή

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

επιστροφή i.min()

# pandas_udf που επιστρέφουν τη μέγιστη τιμή

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

επιστροφή i.max()

# Περάστε τη στήλη ποσότητας στο min_ pandas_udf ομαδοποιώντας locate_country.

market_df.groupby( 'locate_country' ).agg(min_(market_df[ 'ποσότητα' ])).προβολή()

# Περάστε τη στήλη ποσότητας στο max_ pandas_udf ομαδοποιώντας locate_country.

market_df.groupby( 'locate_country' ).agg(max_(market_df[ 'ποσότητα' ])).προβολή()

Παραγωγή:

Για να επιστρέψουμε ελάχιστες και μέγιστες τιμές, χρησιμοποιούμε τις συναρτήσεις min() και max() στον τύπο επιστροφής των UDF. Τώρα, ομαδοποιούμε τα δεδομένα στη στήλη 'locate_country'. Δημιουργούνται τέσσερις ομάδες («ΚΙΝΑ», «ΙΝΔΙΑ», «ΙΑΠΩΝΙΑ», «ΗΠΑ»). Για κάθε ομάδα επιστρέφουμε τη μέγιστη ποσότητα. Αντίστοιχα επιστρέφουμε την ελάχιστη ποσότητα.

συμπέρασμα

Βασικά, το pandas_udf () χρησιμοποιείται για την εκτέλεση των διανυσματικών λειτουργιών στο PySpark DataFrame μας. Είδαμε πώς να δημιουργήσουμε το pandas_udf() και να το εφαρμόσουμε στο PySpark DataFrame. Για καλύτερη κατανόηση, συζητήσαμε τα διαφορετικά παραδείγματα λαμβάνοντας υπόψη όλους τους τύπους δεδομένων (string, float και integer). Μπορεί να είναι δυνατή η χρήση του pandas_udf() με groupby() μέσω της συνάρτησης agg().