Πώς να εφαρμόσετε τη ροή δεδομένων σε πραγματικό χρόνο στην Python

Pos Na Epharmosete Te Roe Dedomenon Se Pragmatiko Chrono Sten Python



Η γνώση της εφαρμογής ροής δεδομένων σε πραγματικό χρόνο στην Python λειτουργεί ως βασική δεξιότητα στον σημερινό κόσμο που περιλαμβάνει δεδομένα. Αυτός ο οδηγός διερευνά τα βασικά βήματα και τα απαραίτητα εργαλεία για τη χρήση της ροής δεδομένων σε πραγματικό χρόνο με αυθεντικότητα στην Python. Από την επιλογή ενός κατάλληλου πλαισίου όπως το Apache Kafka ή το Apache Pulsar μέχρι τη σύνταξη ενός κώδικα Python για αβίαστη κατανάλωση δεδομένων, επεξεργασία και αποτελεσματική απεικόνιση, θα αποκτήσουμε τις απαραίτητες δεξιότητες για την κατασκευή των ευέλικτων και αποτελεσματικών καναλιών δεδομένων σε πραγματικό χρόνο.

Παράδειγμα 1: Υλοποίηση ροής δεδομένων σε πραγματικό χρόνο σε Python

Η εφαρμογή ροής δεδομένων σε πραγματικό χρόνο στην Python είναι ζωτικής σημασίας στη σημερινή εποχή και κόσμο που βασίζεται στα δεδομένα. Σε αυτό το λεπτομερές παράδειγμα, θα περπατήσουμε στη διαδικασία δημιουργίας ενός συστήματος ροής δεδομένων σε πραγματικό χρόνο χρησιμοποιώντας Apache Kafka και Python στο Google Colab.







Για να αρχικοποιήσουμε το παράδειγμα πριν ξεκινήσουμε την κωδικοποίηση, είναι απαραίτητη η δημιουργία ενός συγκεκριμένου περιβάλλοντος στο Google Colab. Το πρώτο πράγμα που πρέπει να κάνουμε είναι να εγκαταστήσουμε τις απαραίτητες βιβλιοθήκες. Χρησιμοποιούμε τη βιβλιοθήκη «kafka-python» για την ενσωμάτωση του Kafka.



! κουκούτσι εγκαθιστώ καφκα-πύθωνα


Αυτή η εντολή εγκαθιστά τη βιβλιοθήκη “kafka-python” που παρέχει τις συναρτήσεις Python και τις συνδέσεις για τον Apache Kafka. Στη συνέχεια, εισάγουμε τις απαιτούμενες βιβλιοθήκες για το έργο μας. Η εισαγωγή των απαιτούμενων βιβλιοθηκών, συμπεριλαμβανομένων των 'KafkaProducer' και 'KafkaConsumer' είναι οι κλάσεις από τη βιβλιοθήκη 'kafka-python' που μας επιτρέπουν να αλληλεπιδράσουμε με μεσίτες Kafka. Το JSON είναι η βιβλιοθήκη Python για να λειτουργεί με τα δεδομένα JSON τα οποία χρησιμοποιούμε για να σειριοποιήσουμε και να αποσειροποιήσουμε τα μηνύματα.



από την εισαγωγή kafka KafkaProducer, KafkaConsumer
εισαγωγή json


Δημιουργία παραγωγού Κάφκα





Αυτό είναι σημαντικό γιατί ένας παραγωγός Κάφκα στέλνει τα δεδομένα σε ένα θέμα του Κάφκα. Στο παράδειγμά μας, δημιουργούμε έναν παραγωγό για την αποστολή προσομοιωμένων δεδομένων σε πραγματικό χρόνο σε ένα θέμα που ονομάζεται 'θέμα σε πραγματικό χρόνο'.

Δημιουργούμε ένα στιγμιότυπο «KafkaProducer» που καθορίζει τη διεύθυνση του μεσίτη Kafka ως «localhost:9092». Στη συνέχεια, χρησιμοποιούμε το “value_serializer”, μια συνάρτηση που σειριοποιεί τα δεδομένα πριν τα στείλει στον Κάφκα. Στην περίπτωσή μας, μια συνάρτηση λάμδα κωδικοποιεί τα δεδομένα ως JSON με κωδικοποίηση UTF-8. Τώρα, ας προσομοιώσουμε κάποια δεδομένα σε πραγματικό χρόνο και ας τα στείλουμε στο θέμα του Κάφκα.



παραγωγός = KafkaΠαραγωγός ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( σε ) .κωδικοποιώ ( 'utf-8' ) )
# Προσομοιωμένα δεδομένα σε πραγματικό χρόνο
δεδομένα = { 'sensor_id' : 1 , 'θερμοκρασία' : 25.5 , 'υγρασία' : 60.2 }
# Αποστολή δεδομένων στο θέμα
παραγωγός.αποστολή ( 'θέμα σε πραγματικό χρόνο' , δεδομένα )


Σε αυτές τις γραμμές, ορίζουμε ένα λεξικό «δεδομένων» που αντιπροσωπεύει δεδομένα προσομοίωσης αισθητήρα. Στη συνέχεια χρησιμοποιούμε τη μέθοδο «αποστολή» για να δημοσιεύσουμε αυτά τα δεδομένα στο «θέμα σε πραγματικό χρόνο».

Στη συνέχεια, θέλουμε να δημιουργήσουμε έναν καταναλωτή Κάφκα και ένας καταναλωτής Κάφκα διαβάζει τα δεδομένα από ένα θέμα Κάφκα. Δημιουργούμε έναν καταναλωτή για να καταναλώνει και να επεξεργάζεται τα μηνύματα στο «θέμα σε πραγματικό χρόνο». Δημιουργούμε μια παρουσία 'KafkaConsumer', προσδιορίζοντας το θέμα που θέλουμε να καταναλώσουμε, π.χ. (θέμα σε πραγματικό χρόνο) και τη διεύθυνση του μεσίτη Kafka. Στη συνέχεια, το 'value_deserializer' είναι μια συνάρτηση που αποσειρώνει τα δεδομένα που λαμβάνονται από τον Kafka. Στην περίπτωσή μας, μια συνάρτηση λάμδα αποκωδικοποιεί τα δεδομένα ως JSON με κωδικοποίηση UTF-8.

καταναλωτής = KafkaΚαταναλωτής ( 'θέμα σε πραγματικό χρόνο' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.αποκωδικοποιώ ( 'utf-8' ) ) )


Χρησιμοποιούμε έναν επαναληπτικό βρόχο για να καταναλώνουμε και να επεξεργαζόμαστε συνεχώς τα μηνύματα από το θέμα.

# Ανάγνωση και επεξεργασία δεδομένων σε πραγματικό χρόνο
Για μήνυμα σε καταναλωτής:
δεδομένα = μήνυμα.τιμή
Τυπώνω ( φά 'Εληφθέντα δεδομένα: {data}' )


Ανακτούμε την τιμή κάθε μηνύματος και τα δεδομένα προσομοίωσης του αισθητήρα μέσα στον βρόχο και τα εκτυπώνουμε στην κονσόλα. Η εκτέλεση του παραγωγού και του καταναλωτή Kafka περιλαμβάνει την εκτέλεση αυτού του κώδικα στο Google Colab και την εκτέλεση των κελιών κώδικα μεμονωμένα. Ο παραγωγός στέλνει τα προσομοιωμένα δεδομένα στο θέμα Kafka και ο καταναλωτής διαβάζει και εκτυπώνει τα δεδομένα που έλαβε.


Ανάλυση της εξόδου καθώς εκτελείται ο κώδικας

Θα παρατηρήσουμε δεδομένα σε πραγματικό χρόνο που παράγονται και καταναλώνονται. Η μορφή δεδομένων μπορεί να διαφέρει ανάλογα με την προσομοίωση ή την πραγματική πηγή δεδομένων. Σε αυτό το λεπτομερές παράδειγμα, καλύπτουμε ολόκληρη τη διαδικασία ρύθμισης ενός συστήματος ροής δεδομένων σε πραγματικό χρόνο χρησιμοποιώντας Apache Kafka και Python στο Google Colab. Θα εξηγήσουμε κάθε γραμμή κώδικα και τη σημασία της στη δημιουργία αυτού του συστήματος. Η ροή δεδομένων σε πραγματικό χρόνο είναι μια ισχυρή δυνατότητα και αυτό το παράδειγμα χρησιμεύει ως βάση για πιο σύνθετες εφαρμογές του πραγματικού κόσμου.

Παράδειγμα 2: Υλοποίηση ροής δεδομένων σε πραγματικό χρόνο σε Python με χρήση δεδομένων χρηματιστηρίου

Ας κάνουμε ένα άλλο μοναδικό παράδειγμα υλοποίησης ροής δεδομένων σε πραγματικό χρόνο στην Python χρησιμοποιώντας ένα διαφορετικό σενάριο. αυτή τη φορά, θα επικεντρωθούμε στα δεδομένα της χρηματιστηριακής αγοράς. Δημιουργούμε ένα σύστημα ροής δεδομένων σε πραγματικό χρόνο που καταγράφει τις αλλαγές στις τιμές των μετοχών και τις επεξεργάζεται χρησιμοποιώντας το Apache Kafka και την Python στο Google Colab. Όπως αποδείχθηκε στο προηγούμενο παράδειγμα, ξεκινάμε διαμορφώνοντας το περιβάλλον μας στο Google Colab. Αρχικά, εγκαθιστούμε τις απαιτούμενες βιβλιοθήκες:

! κουκούτσι εγκαθιστώ kafka-python yfinance


Εδώ, προσθέτουμε τη βιβλιοθήκη «yfinance» που μας επιτρέπει να λαμβάνουμε δεδομένα χρηματιστηρίου σε πραγματικό χρόνο. Στη συνέχεια, εισάγουμε τις απαραίτητες βιβλιοθήκες. Συνεχίζουμε να χρησιμοποιούμε τις τάξεις «KafkaProducer» και «KafkaConsumer» από τη βιβλιοθήκη «kafka-python» για την αλληλεπίδραση με τον Κάφκα. Εισάγουμε JSON για εργασία με τα δεδομένα JSON. Χρησιμοποιούμε επίσης το 'yfinance' για να λάβουμε δεδομένα χρηματιστηρίου σε πραγματικό χρόνο. Εισάγουμε επίσης τη βιβλιοθήκη 'χρόνου' για να προσθέσουμε μια χρονική καθυστέρηση για την προσομοίωση των ενημερώσεων σε πραγματικό χρόνο.

από την εισαγωγή kafka KafkaProducer, KafkaConsumer
εισαγωγή json
εισαγωγές yfinance όπως και yf
εισαγωγή χρόνος


Τώρα, δημιουργούμε έναν παραγωγό Kafka για δεδομένα μετοχών. Ο παραγωγός μας Kafka λαμβάνει δεδομένα μετοχών σε πραγματικό χρόνο και τα στέλνει σε ένα θέμα Kafka που ονομάζεται 'stock-price'.

παραγωγός = KafkaΠαραγωγός ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( σε ) .κωδικοποιώ ( 'utf-8' ) )

ενώ Αληθής:
μετοχή = yf.Ticker ( 'AAPL' ) # Παράδειγμα: μετοχή Apple Inc
stock_data = stock.history ( περίοδος = '1η' )
τελευταία_τιμή = απόθεμα_στοιχεία [ 'Κλείσε' ] .iloc [ - 1 ]
δεδομένα = { 'σύμβολο' : 'AAPL' , 'τιμή' : τελική τιμή }
παραγωγός.αποστολή ( 'τιμή μετοχής' , δεδομένα )
ώρα.ύπνο ( 10 ) # Προσομοίωση ενημερώσεων σε πραγματικό χρόνο κάθε 10 δευτερόλεπτα


Δημιουργούμε μια παρουσία 'KafkaProducer' με τη διεύθυνση του μεσίτη Kafka σε αυτόν τον κώδικα. Μέσα στον βρόχο, χρησιμοποιούμε το 'yfinance' για να λάβουμε την πιο πρόσφατη τιμή μετοχής για την Apple Inc. ('AAPL'). Στη συνέχεια, εξάγουμε την τελευταία τιμή κλεισίματος και τη στέλνουμε στο θέμα «stock-price». Τελικά, εισάγουμε μια χρονική καθυστέρηση για την προσομοίωση των ενημερώσεων σε πραγματικό χρόνο κάθε 10 δευτερόλεπτα.

Ας δημιουργήσουμε έναν καταναλωτή Kafka για να διαβάσει και να επεξεργαστεί τα δεδομένα των τιμών των μετοχών από το θέμα «απόθεμα-τιμή».

καταναλωτής = KafkaConsumer ( 'τιμή μετοχής' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.αποκωδικοποιώ ( 'utf-8' ) ) )

Για μήνυμα σε καταναλωτής:
stock_data = μήνυμα.τιμή
Τυπώνω ( φά 'Δεδομένα ληφθέντων αποθεμάτων: {stock_data['symbol']} - Τιμή: {stock_data['price']}' )


Αυτός ο κώδικας είναι παρόμοιος με τη ρύθμιση καταναλωτή του προηγούμενου παραδείγματος. Διαβάζει και επεξεργάζεται συνεχώς τα μηνύματα από το θέμα «stock-price» και εκτυπώνει το σύμβολο μετοχής και την τιμή στην κονσόλα. Εκτελούμε τα κελιά κώδικα διαδοχικά, π.χ. ένα προς ένα στο Google Colab για να τρέξουμε τον παραγωγό και τον καταναλωτή. Ο παραγωγός λαμβάνει και στέλνει τις ενημερώσεις τιμών μετοχών σε πραγματικό χρόνο, ενώ ο καταναλωτής διαβάζει και εμφανίζει αυτά τα δεδομένα.

! κουκούτσι εγκαθιστώ kafka-python yfinance
από την εισαγωγή kafka KafkaProducer, KafkaConsumer
εισαγωγή json
εισαγωγές yfinance όπως και yf
εισαγωγή χρόνος
παραγωγός = KafkaΠαραγωγός ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( σε ) .κωδικοποιώ ( 'utf-8' ) )

ενώ Αληθής:
μετοχή = yf.Ticker ( 'AAPL' ) # Μετοχή Apple Inc
stock_data = stock.history ( περίοδος = '1η' )
τελευταία_τιμή = απόθεμα_στοιχεία [ 'Κλείσε' ] .iloc [ - 1 ]

δεδομένα = { 'σύμβολο' : 'AAPL' , 'τιμή' : τελική τιμή }

παραγωγός.αποστολή ( 'τιμή μετοχής' , δεδομένα )

ώρα.ύπνο ( 10 ) # Προσομοίωση ενημερώσεων σε πραγματικό χρόνο κάθε 10 δευτερόλεπτα
καταναλωτής = KafkaConsumer ( 'τιμή μετοχής' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.αποκωδικοποιώ ( 'utf-8' ) ) )

Για μήνυμα σε καταναλωτής:
stock_data = μήνυμα.τιμή
Τυπώνω ( φά 'Δεδομένα ληφθέντων αποθεμάτων: {stock_data['symbol']} - Τιμή: {stock_data['price']}' )


Στην ανάλυση της παραγωγής μετά την εκτέλεση του κώδικα, θα παρατηρήσουμε τις ενημερώσεις τιμών μετοχών σε πραγματικό χρόνο για την Apple Inc. να παράγονται και να καταναλώνονται.

συμπέρασμα

Σε αυτό το μοναδικό παράδειγμα, δείξαμε την εφαρμογή ροής δεδομένων σε πραγματικό χρόνο στην Python χρησιμοποιώντας τον Apache Kafka και τη βιβλιοθήκη «yfinance» για τη λήψη και επεξεργασία των δεδομένων του χρηματιστηρίου. Εξηγήσαμε διεξοδικά κάθε γραμμή του κώδικα. Η ροή δεδομένων σε πραγματικό χρόνο μπορεί να εφαρμοστεί σε διάφορα πεδία για τη δημιουργία των πραγματικών εφαρμογών στα χρηματοοικονομικά, το IoT και πολλά άλλα.