పైథాన్‌లో రియల్-టైమ్ డేటా స్ట్రీమింగ్‌ని ఎలా అమలు చేయాలి

Paithan Lo Riyal Taim Deta Striming Ni Ela Amalu Ceyali



పైథాన్‌లో నిజ-సమయ డేటా స్ట్రీమింగ్ అమలులో నైపుణ్యం సాధించడం నేటి డేటా ప్రమేయం ఉన్న ప్రపంచంలో ముఖ్యమైన నైపుణ్యంగా పనిచేస్తుంది. ఈ గైడ్ పైథాన్‌లో ప్రామాణికతతో నిజ-సమయ డేటా స్ట్రీమింగ్‌ను ఉపయోగించడం కోసం ప్రధాన దశలు మరియు అవసరమైన సాధనాలను అన్వేషిస్తుంది. అపాచీ కాఫ్కా లేదా అపాచీ పల్సర్ వంటి యుక్తమైన ఫ్రేమ్‌వర్క్‌ను ఎంచుకోవడం నుండి అప్రయత్నంగా డేటా వినియోగం, ప్రాసెసింగ్ మరియు ప్రభావవంతమైన విజువలైజేషన్ కోసం పైథాన్ కోడ్‌ను వ్రాయడం వరకు, మేము చురుకైన మరియు సమర్థవంతమైన నిజ-సమయ డేటా ఛానెల్‌లను రూపొందించడానికి అవసరమైన నైపుణ్యాలను పొందుతాము.

ఉదాహరణ 1: పైథాన్‌లో రియల్-టైమ్ డేటా స్ట్రీమింగ్ అమలు

పైథాన్‌లో నిజ-సమయ డేటా స్ట్రీమింగ్‌ని అమలు చేయడం నేటి డేటా-ఆధారిత యుగం మరియు ప్రపంచంలో కీలకం. ఈ వివరణాత్మక ఉదాహరణలో, మేము Google Colabలో Apache Kafka మరియు Pythonని ఉపయోగించి నిజ-సమయ డేటా స్ట్రీమింగ్ సిస్టమ్‌ను రూపొందించే ప్రక్రియ ద్వారా నడుస్తాము.







మేము కోడింగ్ ప్రారంభించే ముందు ఉదాహరణను ప్రారంభించడానికి, Google Colabలో నిర్దిష్ట వాతావరణాన్ని నిర్మించడం అవసరం. మేము చేయవలసిన మొదటి విషయం అవసరమైన లైబ్రరీలను ఇన్స్టాల్ చేయడం. మేము కాఫ్కా ఇంటిగ్రేషన్ కోసం “కాఫ్కా-పైథాన్” లైబ్రరీని ఉపయోగిస్తాము.



! పిప్ ఇన్స్టాల్ కాఫ్కా-పైథాన్


ఈ కమాండ్ “కాఫ్కా-పైథాన్” లైబ్రరీని ఇన్‌స్టాల్ చేస్తుంది, ఇది పైథాన్ ఫంక్షన్‌లను మరియు అపాచీ కాఫ్కా కోసం బైండింగ్‌లను అందిస్తుంది. తరువాత, మేము మా ప్రాజెక్ట్ కోసం అవసరమైన లైబ్రరీలను దిగుమతి చేస్తాము. 'కాఫ్కాప్రొడ్యూసర్' మరియు 'కాఫ్కా కన్స్యూమర్'తో సహా అవసరమైన లైబ్రరీలను దిగుమతి చేసుకోవడం అనేది 'కాఫ్కా-పైథాన్' లైబ్రరీ నుండి తరగతులు, ఇది కాఫ్కా బ్రోకర్లతో పరస్పర చర్య చేయడానికి మాకు వీలు కల్పిస్తుంది. JSON అనేది JSON డేటాతో పని చేయడానికి పైథాన్ లైబ్రరీ, మేము సందేశాలను సీరియలైజ్ చేయడానికి మరియు డీరియలైజ్ చేయడానికి ఉపయోగిస్తాము.



కాఫ్కా దిగుమతి కాఫ్కా ప్రొడ్యూసర్, కాఫ్కా కన్స్యూమర్ నుండి
json దిగుమతి


కాఫ్కా నిర్మాత సృష్టి





కాఫ్కా నిర్మాత కాఫ్కా టాపిక్‌కి డేటాను పంపడం వలన ఇది చాలా ముఖ్యం. మా ఉదాహరణలో, “రియల్ టైమ్ టాపిక్” అనే అంశానికి అనుకరణ చేసిన నిజ-సమయ డేటాను పంపడానికి మేము నిర్మాతను సృష్టిస్తాము.

మేము కాఫ్కా బ్రోకర్ చిరునామాను 'localhost:9092'గా పేర్కొనే 'కాఫ్కాప్రొడ్యూసర్' ఉదాహరణను సృష్టిస్తాము. అప్పుడు, మేము 'value_serializer'ని ఉపయోగిస్తాము, ఇది కాఫ్కాకు పంపే ముందు డేటాను ధారావాహికంగా మార్చే ఒక ఫంక్షన్. మా విషయంలో, లాంబ్డా ఫంక్షన్ డేటాను UTF-8-ఎన్‌కోడ్ చేసిన JSONగా ఎన్‌కోడ్ చేస్తుంది. ఇప్పుడు, కొంత నిజ-సమయ డేటాను అనుకరించండి మరియు దానిని కాఫ్కా టాపిక్‌కు పంపండి.



నిర్మాత = కాఫ్కా నిర్మాత ( bootstrap_servers = 'లోకల్ హోస్ట్:9092' ,
విలువ_సిరియలైజర్ =lambda v: json.dumps ( లో ) .ఎన్కోడ్ ( 'utf-8' ) )
# అనుకరణ నిజ-సమయ డేటా
డేటా = { 'సెన్సార్_ఐడి' : 1 , 'ఉష్ణోగ్రత' : 25.5 , 'తేమ' : 60.2 }
# టాపిక్‌కు డేటాను పంపుతోంది
నిర్మాత.పంపు ( 'నిజ సమయ అంశం' , సమాచారం )


ఈ లైన్‌లలో, మేము అనుకరణ సెన్సార్ డేటాను సూచించే “డేటా” నిఘంటువుని నిర్వచించాము. మేము ఈ డేటాను 'రియల్ టైమ్-టాపిక్'కి ప్రచురించడానికి 'పంపు' పద్ధతిని ఉపయోగిస్తాము.

అప్పుడు, మేము కాఫ్కా వినియోగదారుని సృష్టించాలనుకుంటున్నాము మరియు కాఫ్కా వినియోగదారు కాఫ్కా అంశం నుండి డేటాను చదువుతారు. మేము 'రియల్ టైమ్-టాపిక్'లో సందేశాలను వినియోగించుకోవడానికి మరియు ప్రాసెస్ చేయడానికి వినియోగదారుని సృష్టిస్తాము. మేము 'కాఫ్కా కన్స్యూమర్' ఉదాహరణను సృష్టిస్తాము, మేము వినియోగించాలనుకుంటున్న అంశాన్ని పేర్కొంటాము, ఉదా., (రియల్-టైమ్-టాపిక్) మరియు కాఫ్కా బ్రోకర్ చిరునామా. అప్పుడు, “value_deserializer” అనేది కాఫ్కా నుండి స్వీకరించబడిన డేటాను డీరియలైజ్ చేసే ఒక ఫంక్షన్. మా విషయంలో, లాంబ్డా ఫంక్షన్ డేటాను UTF-8-ఎన్‌కోడ్ చేసిన JSONగా డీకోడ్ చేస్తుంది.

వినియోగదారు = కాఫ్కా వినియోగదారుడు ( 'నిజ సమయ అంశం' ,
bootstrap_servers = 'లోకల్ హోస్ట్:9092' ,
విలువ_డీసరియలైజర్ = లాంబ్డా x: json.loads ( x.డీకోడ్ ( 'utf-8' ) ) )


మేము అంశం నుండి సందేశాలను నిరంతరం వినియోగించుకోవడానికి మరియు ప్రాసెస్ చేయడానికి పునరుక్తి లూప్‌ని ఉపయోగిస్తాము.

# నిజ-సమయ డేటాను చదవడం మరియు ప్రాసెస్ చేయడం
కోసం సందేశం లో వినియోగదారు:
డేటా = సందేశం.విలువ
ముద్రణ ( f 'అందుకున్న డేటా: {data}' )


మేము ప్రతి సందేశం యొక్క విలువను మరియు లూప్‌లోని మా అనుకరణ సెన్సార్ డేటాను తిరిగి పొందుతాము మరియు దానిని కన్సోల్‌కు ప్రింట్ చేస్తాము. కాఫ్కా నిర్మాత మరియు వినియోగదారుని అమలు చేయడంలో ఈ కోడ్‌ని Google Colabలో అమలు చేయడం మరియు కోడ్ సెల్‌లను ఒక్కొక్కటిగా అమలు చేయడం వంటివి ఉంటాయి. నిర్మాత అనుకరణ డేటాను కాఫ్కా టాపిక్‌కు పంపుతారు మరియు వినియోగదారు అందుకున్న డేటాను చదివి ప్రింట్ చేస్తారు.


కోడ్ రన్‌గా అవుట్‌పుట్ యొక్క విశ్లేషణ

మేము ఉత్పత్తి చేయబడిన మరియు వినియోగించబడుతున్న నిజ-సమయ డేటాను గమనిస్తాము. మా అనుకరణ లేదా వాస్తవ డేటా మూలాన్ని బట్టి డేటా ఫార్మాట్ మారవచ్చు. ఈ వివరణాత్మక ఉదాహరణలో, Google Colabలో Apache Kafka మరియు Pythonని ఉపయోగించి నిజ-సమయ డేటా స్ట్రీమింగ్ సిస్టమ్‌ను సెటప్ చేసే మొత్తం ప్రక్రియను మేము కవర్ చేస్తాము. మేము ఈ వ్యవస్థను నిర్మించడంలో కోడ్ యొక్క ప్రతి లైన్ మరియు దాని ప్రాముఖ్యతను వివరిస్తాము. నిజ-సమయ డేటా స్ట్రీమింగ్ అనేది శక్తివంతమైన సామర్ధ్యం, మరియు ఈ ఉదాహరణ మరింత సంక్లిష్టమైన వాస్తవ-ప్రపంచ అనువర్తనాలకు పునాదిగా పనిచేస్తుంది.

ఉదాహరణ 2: స్టాక్ మార్కెట్ డేటాను ఉపయోగించి పైథాన్‌లో రియల్-టైమ్ డేటా స్ట్రీమింగ్‌ను అమలు చేయడం

వేరొక దృష్టాంతాన్ని ఉపయోగించి పైథాన్‌లో నిజ-సమయ డేటా స్ట్రీమింగ్‌ను అమలు చేయడానికి మరొక ప్రత్యేకమైన ఉదాహరణను చేద్దాం; ఈసారి, మేము స్టాక్ మార్కెట్ డేటాపై దృష్టి పెడతాము. మేము Google Colabలో Apache Kafka మరియు Pythonని ఉపయోగించి స్టాక్ ధర మార్పులను క్యాప్చర్ చేసి వాటిని ప్రాసెస్ చేసే నిజ-సమయ డేటా స్ట్రీమింగ్ సిస్టమ్‌ను సృష్టిస్తాము. మునుపటి ఉదాహరణలో ప్రదర్శించినట్లుగా, మేము Google Colabలో మా వాతావరణాన్ని కాన్ఫిగర్ చేయడం ద్వారా ప్రారంభిస్తాము. మొదట, మేము అవసరమైన లైబ్రరీలను ఇన్‌స్టాల్ చేస్తాము:

! పిప్ ఇన్స్టాల్ కాఫ్కా-పైథాన్ yfinance


ఇక్కడ, మేము నిజ-సమయ స్టాక్ మార్కెట్ డేటాను పొందడానికి అనుమతించే “yfinance” లైబ్రరీని జోడిస్తాము. తరువాత, మేము అవసరమైన లైబ్రరీలను దిగుమతి చేస్తాము. మేము కాఫ్కా పరస్పర చర్య కోసం 'కాఫ్కా-పైథాన్' లైబ్రరీ నుండి 'కాఫ్కాప్రొడ్యూసర్' మరియు 'కాఫ్కా కన్స్యూమర్' తరగతులను ఉపయోగించడం కొనసాగిస్తాము. JSON డేటాతో పని చేయడానికి మేము JSONని దిగుమతి చేస్తాము. మేము నిజ-సమయ స్టాక్ మార్కెట్ డేటాను పొందడానికి “yfinance”ని కూడా ఉపయోగిస్తాము. నిజ-సమయ నవీకరణలను అనుకరించటానికి సమయ ఆలస్యాన్ని జోడించడానికి మేము 'సమయం' లైబ్రరీని కూడా దిగుమతి చేస్తాము.

కాఫ్కా దిగుమతి కాఫ్కా ప్రొడ్యూసర్, కాఫ్కా కన్స్యూమర్ నుండి
json దిగుమతి
yfinance దిగుమతి వంటి yf
దిగుమతి సమయం


ఇప్పుడు, మేము స్టాక్ డేటా కోసం కాఫ్కా ప్రొడ్యూసర్‌ని సృష్టిస్తాము. మా కాఫ్కా నిర్మాత నిజ-సమయ స్టాక్ డేటాను పొందారు మరియు దానిని 'స్టాక్-ప్రైస్' అనే కాఫ్కా టాపిక్‌కి పంపుతారు.

నిర్మాత = కాఫ్కా నిర్మాత ( bootstrap_servers = 'లోకల్ హోస్ట్:9092' ,
విలువ_సిరియలైజర్ =lambda v: json.dumps ( లో ) .ఎన్కోడ్ ( 'utf-8' ) )

అయితే నిజం:
స్టాక్ = yf.టిక్కర్ ( 'AAPL' ) # ఉదాహరణ: Apple Inc. స్టాక్
స్టాక్_డేటా = స్టాక్.చరిత్ర ( కాలం = '1d' )
last_price = స్టాక్_డేటా [ 'దగ్గరగా' ] .iloc [ - 1 ]
డేటా = { 'చిహ్నం' : 'AAPL' , 'ధర' : చివరి ధర }
నిర్మాత.పంపు ( 'స్టాక్-ధర' , సమాచారం )
సమయం.నిద్ర ( 10 ) # ప్రతి 10 సెకన్లకు నిజ-సమయ నవీకరణలను అనుకరించండి


మేము ఈ కోడ్‌లోని కాఫ్కా బ్రోకర్ చిరునామాతో “కాఫ్కాప్రొడ్యూసర్” ఉదాహరణను సృష్టిస్తాము. లూప్ లోపల, Apple Inc. ('AAPL') కోసం తాజా స్టాక్ ధరను పొందడానికి మేము 'yfinance'ని ఉపయోగిస్తాము. అప్పుడు, మేము చివరి ముగింపు ధరను సంగ్రహించి, దానిని 'స్టాక్-ధర' అంశానికి పంపుతాము. చివరికి, మేము ప్రతి 10 సెకన్లకు నిజ-సమయ నవీకరణలను అనుకరించడానికి సమయ ఆలస్యాన్ని పరిచయం చేస్తాము.

'స్టాక్-ప్రైస్' టాపిక్ నుండి స్టాక్ ధర డేటాను చదవడానికి మరియు ప్రాసెస్ చేయడానికి కాఫ్కా వినియోగదారుని సృష్టిద్దాం.

వినియోగదారు = కాఫ్కా వినియోగదారుడు ( 'స్టాక్-ధర' ,
bootstrap_servers = 'లోకల్ హోస్ట్:9092' ,
విలువ_డీసరియలైజర్ = లాంబ్డా x: json.loads ( x.డీకోడ్ ( 'utf-8' ) ) )

కోసం సందేశం లో వినియోగదారు:
స్టాక్_డేటా = సందేశం.విలువ
ముద్రణ ( f 'అందుకున్న స్టాక్ డేటా: {stock_data['symbol']} - ధర: {stock_data['price']}' )


ఈ కోడ్ మునుపటి ఉదాహరణ వినియోగదారు సెటప్ మాదిరిగానే ఉంటుంది. ఇది 'స్టాక్-ప్రైస్' టాపిక్ నుండి సందేశాలను నిరంతరం చదువుతుంది మరియు ప్రాసెస్ చేస్తుంది మరియు స్టాక్ చిహ్నాన్ని మరియు ధరను కన్సోల్‌కు ప్రింట్ చేస్తుంది. మేము కోడ్ సెల్‌లను వరుసగా అమలు చేస్తాము, ఉదా., నిర్మాత మరియు వినియోగదారుని అమలు చేయడానికి Google Colabలో ఒక్కొక్కటిగా అమలు చేస్తాము. వినియోగదారుడు ఈ డేటాను చదివేటప్పుడు మరియు ప్రదర్శించేటప్పుడు నిర్మాత నిజ-సమయ స్టాక్ ధరల నవీకరణలను అందుకుంటారు మరియు పంపుతారు.

! పిప్ ఇన్స్టాల్ కాఫ్కా-పైథాన్ yfinance
కాఫ్కా దిగుమతి కాఫ్కా ప్రొడ్యూసర్, కాఫ్కా కన్స్యూమర్ నుండి
json దిగుమతి
yfinance దిగుమతి వంటి yf
దిగుమతి సమయం
నిర్మాత = కాఫ్కా నిర్మాత ( bootstrap_servers = 'లోకల్ హోస్ట్:9092' ,
విలువ_సిరియలైజర్ =lambda v: json.dumps ( లో ) .ఎన్కోడ్ ( 'utf-8' ) )

అయితే నిజం:
స్టాక్ = yf.టిక్కర్ ( 'AAPL' ) # Apple Inc. స్టాక్
స్టాక్_డేటా = స్టాక్.చరిత్ర ( కాలం = '1d' )
last_price = స్టాక్_డేటా [ 'దగ్గరగా' ] .iloc [ - 1 ]

డేటా = { 'చిహ్నం' : 'AAPL' , 'ధర' : చివరి ధర }

నిర్మాత.పంపు ( 'స్టాక్-ధర' , సమాచారం )

సమయం.నిద్ర ( 10 ) # ప్రతి 10 సెకన్లకు నిజ-సమయ నవీకరణలను అనుకరించండి
వినియోగదారు = కాఫ్కా వినియోగదారుడు ( 'స్టాక్-ధర' ,
bootstrap_servers = 'లోకల్ హోస్ట్:9092' ,
విలువ_డీసరియలైజర్ = లాంబ్డా x: json.loads ( x.డీకోడ్ ( 'utf-8' ) ) )

కోసం సందేశం లో వినియోగదారు:
స్టాక్_డేటా = సందేశం.విలువ
ముద్రణ ( f 'అందుకున్న స్టాక్ డేటా: {stock_data['symbol']} - ధర: {stock_data['price']}' )


కోడ్ అమలు చేయబడిన తర్వాత అవుట్‌పుట్ యొక్క విశ్లేషణలో, Apple Inc. ఉత్పత్తి మరియు వినియోగించబడుతున్న వాస్తవ-సమయ స్టాక్ ధరల నవీకరణలను మేము గమనిస్తాము.

ముగింపు

ఈ ప్రత్యేకమైన ఉదాహరణలో, స్టాక్ మార్కెట్ డేటాను సంగ్రహించడానికి మరియు ప్రాసెస్ చేయడానికి Apache Kafka మరియు “yfinance” లైబ్రరీని ఉపయోగించి పైథాన్‌లో నిజ-సమయ డేటా స్ట్రీమింగ్ అమలును మేము ప్రదర్శించాము. మేము కోడ్ యొక్క ప్రతి పంక్తిని పూర్తిగా వివరించాము. ఫైనాన్స్, IoT మరియు మరిన్నింటిలో వాస్తవ-ప్రపంచ అప్లికేషన్‌లను రూపొందించడానికి రియల్-టైమ్ డేటా స్ట్రీమింగ్‌ను వివిధ ఫీల్డ్‌లకు అన్వయించవచ్చు.