విషయాల అంశం:
మీరు PySpark DataFrame మరియు మాడ్యూల్ ఇన్స్టాలేషన్ గురించి తెలుసుకోవాలనుకుంటే, దీని ద్వారా వెళ్ళండి వ్యాసం .
Pyspark.sql.functions.pandas_udf()
PySparkలోని sql.functions మాడ్యూల్లో pandas_udf () అందుబాటులో ఉంది, దీనిని 'from' కీవర్డ్ ఉపయోగించి దిగుమతి చేసుకోవచ్చు. ఇది మన PySpark DataFrameలో వెక్టరైజ్డ్ ఆపరేషన్లను నిర్వహించడానికి ఉపయోగించబడుతుంది. ఈ ఫంక్షన్ మూడు పారామితులను దాటడం ద్వారా డెకరేటర్ లాగా అమలు చేయబడుతుంది. ఆ తర్వాత, మేము బాణం ఉపయోగించి డేటాను వెక్టార్ ఫార్మాట్లో (దీని కోసం సిరీస్/NumPyని ఉపయోగిస్తాము) తిరిగి ఇచ్చే వినియోగదారు నిర్వచించిన ఫంక్షన్ని సృష్టించవచ్చు. ఈ ఫంక్షన్లో, మేము ఫలితాన్ని తిరిగి ఇవ్వగలుగుతాము.
నిర్మాణం & సింటాక్స్:
ముందుగా, ఈ ఫంక్షన్ యొక్క నిర్మాణం మరియు వాక్యనిర్మాణాన్ని చూద్దాం:
@pandas_udf(డేటాటైప్)డెఫ్ ఫంక్షన్_పేరు(ఆపరేషన్) -> convert_format:
వాపసు ప్రకటన
ఇక్కడ, ఫంక్షన్_పేరు అనేది మన నిర్వచించిన ఫంక్షన్ పేరు. డేటా రకం ఈ ఫంక్షన్ ద్వారా అందించబడిన డేటా రకాన్ని నిర్దేశిస్తుంది. మేము 'రిటర్న్' కీవర్డ్ ఉపయోగించి ఫలితాన్ని తిరిగి ఇవ్వవచ్చు. బాణం అసైన్మెంట్తో అన్ని కార్యకలాపాలు ఫంక్షన్ లోపల నిర్వహించబడతాయి.
Pandas_udf (ఫంక్షన్ మరియు రిటర్న్ టైప్)
- మొదటి పరామితి దానికి పంపబడిన వినియోగదారు నిర్వచించిన ఫంక్షన్.
- ఫంక్షన్ నుండి రిటర్న్ డేటా రకాన్ని పేర్కొనడానికి రెండవ పరామితి ఉపయోగించబడుతుంది.
సమాచారం:
ఈ మొత్తం గైడ్లో, మేము ప్రదర్శన కోసం ఒక PySpark DataFrameని మాత్రమే ఉపయోగిస్తాము. మేము నిర్వచించే అన్ని వినియోగదారు నిర్వచించిన విధులు ఈ PySpark DataFrameలో వర్తింపజేయబడతాయి. మీరు PySpark యొక్క ఇన్స్టాలేషన్ తర్వాత ముందుగా మీ వాతావరణంలో ఈ డేటాఫ్రేమ్ని సృష్టించారని నిర్ధారించుకోండి.
పైస్పార్క్ దిగుమతి
pyspark.sql దిగుమతి SparkSession నుండి
linuxhint_spark_app = SparkSession.builder.appName( 'Linux సూచన' ).getOrCreate()
pyspark.sql.functions నుండి pandas_udf దిగుమతి
pyspark.sql.types దిగుమతి నుండి *
పాండాలను పాండాగా దిగుమతి చేసుకోండి
# కూరగాయల వివరాలు
కూరగాయలు =[{ 'రకం' : 'కూరగాయ' , 'పేరు' : 'టమోటా' , 'locate_country' : 'USA' , 'పరిమాణం' : 800 },
{ 'రకం' : 'పండు' , 'పేరు' : 'అరటి' , 'locate_country' : 'చైనా' , 'పరిమాణం' : ఇరవై },
{ 'రకం' : 'కూరగాయ' , 'పేరు' : 'టమోటా' , 'locate_country' : 'USA' , 'పరిమాణం' : 800 },
{ 'రకం' : 'కూరగాయ' , 'పేరు' : 'మామిడి' , 'locate_country' : 'జపాన్' , 'పరిమాణం' : 0 },
{ 'రకం' : 'పండు' , 'పేరు' : 'నిమ్మకాయ' , 'locate_country' : 'భారతదేశం' , 'పరిమాణం' : 1700 },
{ 'రకం' : 'కూరగాయ' , 'పేరు' : 'టమోటా' , 'locate_country' : 'USA' , 'పరిమాణం' : 1200 },
{ 'రకం' : 'కూరగాయ' , 'పేరు' : 'మామిడి' , 'locate_country' : 'జపాన్' , 'పరిమాణం' : 0 },
{ 'రకం' : 'పండు' , 'పేరు' : 'నిమ్మకాయ' , 'locate_country' : 'భారతదేశం' , 'పరిమాణం' : 0 }
]
# పై డేటా నుండి మార్కెట్ డేటాఫ్రేమ్ను సృష్టించండి
market_df = linuxhint_spark_app.createDataFrame(కూరగాయ)
market_df.show()
అవుట్పుట్:
ఇక్కడ, మేము ఈ డేటాఫ్రేమ్ను 4 నిలువు వరుసలు మరియు 8 అడ్డు వరుసలతో సృష్టిస్తాము. ఇప్పుడు, మేము వినియోగదారు నిర్వచించిన ఫంక్షన్లను సృష్టించడానికి మరియు వాటిని ఈ నిలువు వరుసలకు వర్తింపజేయడానికి pandas_udf()ని ఉపయోగిస్తాము.
Pandas_udf() వివిధ డేటా రకాలతో
ఈ దృష్టాంతంలో, మేము pandas_udf()తో కొన్ని వినియోగదారు నిర్వచించిన ఫంక్షన్లను సృష్టిస్తాము మరియు వాటిని నిలువు వరుసలపై వర్తింపజేస్తాము మరియు ఎంపిక() పద్ధతిని ఉపయోగించి ఫలితాలను ప్రదర్శిస్తాము. ప్రతి సందర్భంలో, మేము వెక్టరైజ్డ్ ఆపరేషన్లను చేస్తున్నప్పుడు పాండాలను ఉపయోగిస్తాము.సిరీస్. ఇది నిలువు వరుస విలువలను ఒక డైమెన్షనల్ శ్రేణిగా పరిగణిస్తుంది మరియు కాలమ్పై ఆపరేషన్ వర్తించబడుతుంది. డెకరేటర్లోనే, మేము ఫంక్షన్ రిటర్న్ రకాన్ని నిర్దేశిస్తాము.
ఉదాహరణ 1: స్ట్రింగ్ రకంతో Pandas_udf().
ఇక్కడ, మేము స్ట్రింగ్ రకం కాలమ్ విలువలను పెద్ద అక్షరం మరియు చిన్న అక్షరానికి మార్చడానికి స్ట్రింగ్ రిటర్న్ రకంతో రెండు వినియోగదారు నిర్వచించిన ఫంక్షన్లను సృష్టిస్తాము. చివరగా, మేము ఈ ఫంక్షన్లను “రకం” మరియు “లొకేట్_కంట్రీ” నిలువు వరుసలపై వర్తింపజేస్తాము.
# pandas_udfతో టైప్ కాలమ్ని అప్పర్ కేస్గా మార్చండి@pandas_udf(స్ట్రింగ్ టైప్())
def type_upper_case(i: panda.Series) -> panda.Series:
i.str.upper()ని తిరిగి ఇవ్వండి
# లొకేట్_కంట్రీ కాలమ్ను పాండాస్_యుడిఎఫ్తో చిన్న అక్షరానికి మార్చండి
@pandas_udf(స్ట్రింగ్ టైప్())
def country_lower_case(i: panda.Series) -> panda.Series:
i.str.lower()ని తిరిగి ఇవ్వండి
# సెలెక్ట్()ని ఉపయోగించి నిలువు వరుసలను ప్రదర్శించండి
market_df.select( 'రకం' ,type_upper_case( 'రకం' ), 'locate_country' ,
దేశం_తక్కువ_కేస్( 'locate_country' )).షో()
అవుట్పుట్:
వివరణ:
StringType() ఫంక్షన్ pyspark.sql.types మాడ్యూల్లో అందుబాటులో ఉంది. PySpark DataFrameని సృష్టించేటప్పుడు మేము ఇప్పటికే ఈ మాడ్యూల్ని దిగుమతి చేసాము.
- ముందుగా, UDF (యూజర్-డిఫైన్డ్ ఫంక్షన్) str.upper() ఫంక్షన్ని ఉపయోగించి తీగలను అప్పర్కేస్లో అందిస్తుంది. str.upper() అనేది సిరీస్ డేటా స్ట్రక్చర్లో అందుబాటులో ఉంది (మేము ఫంక్షన్ లోపల బాణంతో సిరీస్గా మారుస్తున్నందున) ఇది ఇచ్చిన స్ట్రింగ్ను పెద్ద అక్షరానికి మారుస్తుంది. చివరగా, ఈ ఫంక్షన్ ఎంపిక() పద్ధతిలో పేర్కొనబడిన “రకం” కాలమ్కు వర్తించబడుతుంది. గతంలో, టైప్ కాలమ్లోని అన్ని స్ట్రింగ్లు చిన్న అక్షరంలో ఉన్నాయి. ఇప్పుడు, అవి పెద్ద అక్షరానికి మార్చబడ్డాయి.
- రెండవది, UDF str.lower()ఫంక్షన్ని ఉపయోగించి తీగలను అప్పర్కేస్లో అందిస్తుంది. str.lower() సిరీస్ డేటా స్ట్రక్చర్లో అందుబాటులో ఉంది, ఇది ఇచ్చిన స్ట్రింగ్ను చిన్న అక్షరానికి మారుస్తుంది. చివరగా, ఈ ఫంక్షన్ ఎంపిక() పద్ధతిలో పేర్కొనబడిన “రకం” కాలమ్కు వర్తించబడుతుంది. మునుపు, టైప్ కాలమ్లోని అన్ని స్ట్రింగ్లు పెద్ద అక్షరంలో ఉన్నాయి. ఇప్పుడు, అవి చిన్న అక్షరానికి మార్చబడ్డాయి.
ఉదాహరణ 2: Pandas_udf() పూర్ణాంకం రకంతో
PySpark DataFrame పూర్ణాంకాల కాలమ్ను పాండాస్ సిరీస్గా మార్చే UDFని సృష్టిద్దాం మరియు ప్రతి విలువకు 100 జోడించండి. సెలెక్ట్() పద్ధతిలో ఈ ఫంక్షన్కి “పరిమాణం” నిలువు వరుసను పాస్ చేయండి.
# 100 జోడించండి@pandas_udf(IntegerType())
def add_100(i: panda.Series) -> panda.Series:
i+ని తిరిగి ఇవ్వండి 100
# పై ఫంక్షన్కి క్వాంటిటీ కాలమ్ని పాస్ చేసి డిస్ప్లే చేయండి.
market_df.select( 'పరిమాణం' ,జోడించు_100( 'పరిమాణం' )).షో()
అవుట్పుట్:
వివరణ:
UDF లోపల, మేము అన్ని విలువలను పునరావృతం చేస్తాము మరియు వాటిని సిరీస్గా మారుస్తాము. ఆ తర్వాత, మేము సిరీస్లోని ప్రతి విలువకు 100 జోడిస్తాము. చివరగా, మేము ఈ ఫంక్షన్కు “పరిమాణం” కాలమ్ను పాస్ చేస్తాము మరియు అన్ని విలువలకు 100 జోడించబడిందని మనం చూడవచ్చు.
Pandas_udf() గ్రూప్బై() & Agg()ని ఉపయోగించి వివిధ డేటా రకాలతో
UDFని సమగ్ర నిలువు వరుసలకు పాస్ చేయడానికి ఉదాహరణలను చూద్దాం. ఇక్కడ, కాలమ్ విలువలు మొదట groupby() ఫంక్షన్ని ఉపయోగించి సమూహం చేయబడతాయి మరియు agg() ఫంక్షన్ని ఉపయోగించి అగ్రిగేషన్ చేయబడుతుంది. మేము ఈ మొత్తం ఫంక్షన్లో మా UDFని పాస్ చేస్తాము.
సింటాక్స్:
pyspark_dataframe_object.groupby( 'గ్రూపింగ్_కాలమ్' ).agg(UDF(pyspark_dataframe_object[ 'కాలమ్' ]))
ఇక్కడ, సమూహ నిలువు వరుసలోని విలువలు ముందుగా సమూహం చేయబడతాయి. ఆపై, మా UDFకి సంబంధించి ప్రతి సమూహ డేటాపై అగ్రిగేషన్ చేయబడుతుంది.
ఉదాహరణ 1: Pandas_udf() సగటు సగటు()తో
ఇక్కడ, మేము రిటర్న్ టైప్ ఫ్లోట్తో వినియోగదారు నిర్వచించిన ఫంక్షన్ని సృష్టిస్తాము. ఫంక్షన్ లోపల, మేము సగటు() ఫంక్షన్ని ఉపయోగించి సగటును గణిస్తాము. ప్రతి రకానికి సగటు పరిమాణాన్ని పొందడానికి ఈ UDF 'పరిమాణం' నిలువు వరుసకు పంపబడుతుంది.
# సగటు/సగటును తిరిగి ఇవ్వండి@pandas_udf( 'ఫ్లోట్' )
డెఫ్ సగటు_ఫంక్షన్(i: పాండా.సిరీస్) -> ఫ్లోట్:
వాపసు i.mean()
# టైప్ కాలమ్ను సమూహపరచడం ద్వారా ఫంక్షన్కు క్వాంటిటీ కాలమ్ను పాస్ చేయండి.
market_df.groupby( 'రకం' ).agg(సగటు_ఫంక్షన్(మార్కెట్_డిఎఫ్[ 'పరిమాణం' ])).షో()
అవుట్పుట్:
మేము 'రకం' కాలమ్లోని మూలకాల ఆధారంగా సమూహాన్ని చేస్తున్నాము. రెండు సమూహాలు ఏర్పడతాయి - 'పండు' మరియు 'కూరగాయలు'. ప్రతి సమూహానికి, సగటు లెక్కించబడుతుంది మరియు తిరిగి ఇవ్వబడుతుంది.
ఉదాహరణ 2: Pandas_udf() మొత్తం గరిష్టం() మరియు Min()
ఇక్కడ, మేము పూర్ణాంకం (int) రిటర్న్ రకంతో రెండు వినియోగదారు నిర్వచించిన ఫంక్షన్లను సృష్టిస్తాము. మొదటి UDF కనిష్ట విలువను మరియు రెండవ UDF గరిష్ట విలువను అందిస్తుంది.
కనిష్ట విలువను అందించే # pandas_udf@pandas_udf( 'int' )
def min_(i: panda.Series) -> int:
i.min()ని తిరిగి ఇవ్వండి
గరిష్ట విలువను అందించే # pandas_udf
@pandas_udf( 'int' )
def max_(i: panda.Series) -> int:
i.max()ని తిరిగి ఇవ్వండి
# లొకేట్_కంట్రీని సమూహపరచడం ద్వారా పరిమాణ నిలువు వరుసను min_ pandas_udfకి పంపండి.
market_df.groupby( 'locate_country' ).agg(min_(market_df[ 'పరిమాణం' ])).షో()
# లొకేట్_కంట్రీని గ్రూపింగ్ చేయడం ద్వారా పరిమాణ కాలమ్ను గరిష్టంగా_పాండస్_యుడిఎఫ్కి పాస్ చేయండి.
market_df.groupby( 'locate_country' ).agg(max_(market_df[ 'పరిమాణం' ])).షో()
అవుట్పుట్:
కనిష్ట మరియు గరిష్ట విలువలను అందించడానికి, మేము UDFల రిటర్న్ రకంలో min() మరియు max() ఫంక్షన్లను ఉపయోగిస్తాము. ఇప్పుడు, మేము డేటాను “locate_country” నిలువు వరుసలో సమూహపరుస్తాము. నాలుగు సమూహాలు ఏర్పడ్డాయి ('చైనా', 'భారతదేశం', 'జపాన్', 'USA'). ప్రతి సమూహానికి, మేము గరిష్ట పరిమాణాన్ని తిరిగి ఇస్తాము. అదేవిధంగా, మేము కనీస పరిమాణాన్ని తిరిగి ఇస్తాము.
ముగింపు
ప్రాథమికంగా, మన PySpark DataFrameలో వెక్టరైజ్డ్ ఆపరేషన్లను నిర్వహించడానికి pandas_udf () ఉపయోగించబడుతుంది. మేము pandas_udf()ని ఎలా సృష్టించాలో మరియు దానిని PySpark DataFrameకి ఎలా వర్తింపజేయాలో చూశాము. మెరుగైన అవగాహన కోసం, మేము అన్ని డేటాటైప్లను (స్ట్రింగ్, ఫ్లోట్ మరియు పూర్ణాంకం) పరిగణనలోకి తీసుకోవడం ద్వారా విభిన్న ఉదాహరణలను చర్చించాము. agg() ఫంక్షన్ ద్వారా groupby()తో pandas_udf()ని ఉపయోగించడం సాధ్యమవుతుంది.