PySpark Pandas_Udf()

Pyspark Pandas Udf



PySpark DataFrameని మార్చడం pandas_udf() ఫంక్షన్‌ని ఉపయోగించి సాధ్యమవుతుంది. ఇది బాణంతో PySpark DataFrameలో వర్తించబడే వినియోగదారు నిర్వచించిన ఫంక్షన్. మేము pandas_udf()ని ఉపయోగించి వెక్టరైజ్డ్ ఆపరేషన్‌లను చేయవచ్చు. ఈ ఫంక్షన్‌ను డెకరేటర్‌గా పాస్ చేయడం ద్వారా దీన్ని అమలు చేయవచ్చు. సింటాక్స్, పారామితులు మరియు విభిన్న ఉదాహరణలను తెలుసుకోవడానికి ఈ గైడ్‌లోకి ప్రవేశిద్దాం.

విషయాల అంశం:

మీరు PySpark DataFrame మరియు మాడ్యూల్ ఇన్‌స్టాలేషన్ గురించి తెలుసుకోవాలనుకుంటే, దీని ద్వారా వెళ్ళండి వ్యాసం .







Pyspark.sql.functions.pandas_udf()

PySparkలోని sql.functions మాడ్యూల్‌లో pandas_udf () అందుబాటులో ఉంది, దీనిని 'from' కీవర్డ్ ఉపయోగించి దిగుమతి చేసుకోవచ్చు. ఇది మన PySpark DataFrameలో వెక్టరైజ్డ్ ఆపరేషన్‌లను నిర్వహించడానికి ఉపయోగించబడుతుంది. ఈ ఫంక్షన్ మూడు పారామితులను దాటడం ద్వారా డెకరేటర్ లాగా అమలు చేయబడుతుంది. ఆ తర్వాత, మేము బాణం ఉపయోగించి డేటాను వెక్టార్ ఫార్మాట్‌లో (దీని కోసం సిరీస్/NumPyని ఉపయోగిస్తాము) తిరిగి ఇచ్చే వినియోగదారు నిర్వచించిన ఫంక్షన్‌ని సృష్టించవచ్చు. ఈ ఫంక్షన్‌లో, మేము ఫలితాన్ని తిరిగి ఇవ్వగలుగుతాము.



నిర్మాణం & సింటాక్స్:



ముందుగా, ఈ ఫంక్షన్ యొక్క నిర్మాణం మరియు వాక్యనిర్మాణాన్ని చూద్దాం:

@pandas_udf(డేటాటైప్)
డెఫ్ ఫంక్షన్_పేరు(ఆపరేషన్) -> convert_format:
వాపసు ప్రకటన

ఇక్కడ, ఫంక్షన్_పేరు అనేది మన నిర్వచించిన ఫంక్షన్ పేరు. డేటా రకం ఈ ఫంక్షన్ ద్వారా అందించబడిన డేటా రకాన్ని నిర్దేశిస్తుంది. మేము 'రిటర్న్' కీవర్డ్ ఉపయోగించి ఫలితాన్ని తిరిగి ఇవ్వవచ్చు. బాణం అసైన్‌మెంట్‌తో అన్ని కార్యకలాపాలు ఫంక్షన్ లోపల నిర్వహించబడతాయి.





Pandas_udf (ఫంక్షన్ మరియు రిటర్న్ టైప్)

  1. మొదటి పరామితి దానికి పంపబడిన వినియోగదారు నిర్వచించిన ఫంక్షన్.
  2. ఫంక్షన్ నుండి రిటర్న్ డేటా రకాన్ని పేర్కొనడానికి రెండవ పరామితి ఉపయోగించబడుతుంది.

సమాచారం:

ఈ మొత్తం గైడ్‌లో, మేము ప్రదర్శన కోసం ఒక PySpark DataFrameని మాత్రమే ఉపయోగిస్తాము. మేము నిర్వచించే అన్ని వినియోగదారు నిర్వచించిన విధులు ఈ PySpark DataFrameలో వర్తింపజేయబడతాయి. మీరు PySpark యొక్క ఇన్‌స్టాలేషన్ తర్వాత ముందుగా మీ వాతావరణంలో ఈ డేటాఫ్రేమ్‌ని సృష్టించారని నిర్ధారించుకోండి.



పైస్పార్క్ దిగుమతి

pyspark.sql దిగుమతి SparkSession నుండి

linuxhint_spark_app = SparkSession.builder.appName( 'Linux సూచన' ).getOrCreate()

pyspark.sql.functions నుండి pandas_udf దిగుమతి

pyspark.sql.types దిగుమతి నుండి *

పాండాలను పాండాగా దిగుమతి చేసుకోండి

# కూరగాయల వివరాలు

కూరగాయలు =[{ 'రకం' : 'కూరగాయ' , 'పేరు' : 'టమోటా' , 'locate_country' : 'USA' , 'పరిమాణం' : 800 },

{ 'రకం' : 'పండు' , 'పేరు' : 'అరటి' , 'locate_country' : 'చైనా' , 'పరిమాణం' : ఇరవై },

{ 'రకం' : 'కూరగాయ' , 'పేరు' : 'టమోటా' , 'locate_country' : 'USA' , 'పరిమాణం' : 800 },

{ 'రకం' : 'కూరగాయ' , 'పేరు' : 'మామిడి' , 'locate_country' : 'జపాన్' , 'పరిమాణం' : 0 },

{ 'రకం' : 'పండు' , 'పేరు' : 'నిమ్మకాయ' , 'locate_country' : 'భారతదేశం' , 'పరిమాణం' : 1700 },

{ 'రకం' : 'కూరగాయ' , 'పేరు' : 'టమోటా' , 'locate_country' : 'USA' , 'పరిమాణం' : 1200 },

{ 'రకం' : 'కూరగాయ' , 'పేరు' : 'మామిడి' , 'locate_country' : 'జపాన్' , 'పరిమాణం' : 0 },

{ 'రకం' : 'పండు' , 'పేరు' : 'నిమ్మకాయ' , 'locate_country' : 'భారతదేశం' , 'పరిమాణం' : 0 }

]

# పై డేటా నుండి మార్కెట్ డేటాఫ్రేమ్‌ను సృష్టించండి

market_df = linuxhint_spark_app.createDataFrame(కూరగాయ)

market_df.show()

అవుట్‌పుట్:

ఇక్కడ, మేము ఈ డేటాఫ్రేమ్‌ను 4 నిలువు వరుసలు మరియు 8 అడ్డు వరుసలతో సృష్టిస్తాము. ఇప్పుడు, మేము వినియోగదారు నిర్వచించిన ఫంక్షన్‌లను సృష్టించడానికి మరియు వాటిని ఈ నిలువు వరుసలకు వర్తింపజేయడానికి pandas_udf()ని ఉపయోగిస్తాము.

Pandas_udf() వివిధ డేటా రకాలతో

ఈ దృష్టాంతంలో, మేము pandas_udf()తో కొన్ని వినియోగదారు నిర్వచించిన ఫంక్షన్‌లను సృష్టిస్తాము మరియు వాటిని నిలువు వరుసలపై వర్తింపజేస్తాము మరియు ఎంపిక() పద్ధతిని ఉపయోగించి ఫలితాలను ప్రదర్శిస్తాము. ప్రతి సందర్భంలో, మేము వెక్టరైజ్డ్ ఆపరేషన్లను చేస్తున్నప్పుడు పాండాలను ఉపయోగిస్తాము.సిరీస్. ఇది నిలువు వరుస విలువలను ఒక డైమెన్షనల్ శ్రేణిగా పరిగణిస్తుంది మరియు కాలమ్‌పై ఆపరేషన్ వర్తించబడుతుంది. డెకరేటర్‌లోనే, మేము ఫంక్షన్ రిటర్న్ రకాన్ని నిర్దేశిస్తాము.

ఉదాహరణ 1: స్ట్రింగ్ రకంతో Pandas_udf().

ఇక్కడ, మేము స్ట్రింగ్ రకం కాలమ్ విలువలను పెద్ద అక్షరం మరియు చిన్న అక్షరానికి మార్చడానికి స్ట్రింగ్ రిటర్న్ రకంతో రెండు వినియోగదారు నిర్వచించిన ఫంక్షన్‌లను సృష్టిస్తాము. చివరగా, మేము ఈ ఫంక్షన్‌లను “రకం” మరియు “లొకేట్_కంట్రీ” నిలువు వరుసలపై వర్తింపజేస్తాము.

# pandas_udfతో టైప్ కాలమ్‌ని అప్పర్ కేస్‌గా మార్చండి

@pandas_udf(స్ట్రింగ్ టైప్())

def type_upper_case(i: panda.Series) -> panda.Series:

i.str.upper()ని తిరిగి ఇవ్వండి

# లొకేట్_కంట్రీ కాలమ్‌ను పాండాస్_యుడిఎఫ్‌తో చిన్న అక్షరానికి మార్చండి

@pandas_udf(స్ట్రింగ్ టైప్())

def country_lower_case(i: panda.Series) -> panda.Series:

i.str.lower()ని తిరిగి ఇవ్వండి

# సెలెక్ట్()ని ఉపయోగించి నిలువు వరుసలను ప్రదర్శించండి

market_df.select( 'రకం' ,type_upper_case( 'రకం' ), 'locate_country' ,
దేశం_తక్కువ_కేస్( 'locate_country' )).షో()

అవుట్‌పుట్:

వివరణ:

StringType() ఫంక్షన్ pyspark.sql.types మాడ్యూల్‌లో అందుబాటులో ఉంది. PySpark DataFrameని సృష్టించేటప్పుడు మేము ఇప్పటికే ఈ మాడ్యూల్‌ని దిగుమతి చేసాము.

  1. ముందుగా, UDF (యూజర్-డిఫైన్డ్ ఫంక్షన్) str.upper() ఫంక్షన్‌ని ఉపయోగించి తీగలను అప్పర్‌కేస్‌లో అందిస్తుంది. str.upper() అనేది సిరీస్ డేటా స్ట్రక్చర్‌లో అందుబాటులో ఉంది (మేము ఫంక్షన్ లోపల బాణంతో సిరీస్‌గా మారుస్తున్నందున) ఇది ఇచ్చిన స్ట్రింగ్‌ను పెద్ద అక్షరానికి మారుస్తుంది. చివరగా, ఈ ఫంక్షన్ ఎంపిక() పద్ధతిలో పేర్కొనబడిన “రకం” కాలమ్‌కు వర్తించబడుతుంది. గతంలో, టైప్ కాలమ్‌లోని అన్ని స్ట్రింగ్‌లు చిన్న అక్షరంలో ఉన్నాయి. ఇప్పుడు, అవి పెద్ద అక్షరానికి మార్చబడ్డాయి.
  2. రెండవది, UDF str.lower()ఫంక్షన్‌ని ఉపయోగించి తీగలను అప్పర్‌కేస్‌లో అందిస్తుంది. str.lower() సిరీస్ డేటా స్ట్రక్చర్‌లో అందుబాటులో ఉంది, ఇది ఇచ్చిన స్ట్రింగ్‌ను చిన్న అక్షరానికి మారుస్తుంది. చివరగా, ఈ ఫంక్షన్ ఎంపిక() పద్ధతిలో పేర్కొనబడిన “రకం” కాలమ్‌కు వర్తించబడుతుంది. మునుపు, టైప్ కాలమ్‌లోని అన్ని స్ట్రింగ్‌లు పెద్ద అక్షరంలో ఉన్నాయి. ఇప్పుడు, అవి చిన్న అక్షరానికి మార్చబడ్డాయి.

ఉదాహరణ 2: Pandas_udf() పూర్ణాంకం రకంతో

PySpark DataFrame పూర్ణాంకాల కాలమ్‌ను పాండాస్ సిరీస్‌గా మార్చే UDFని సృష్టిద్దాం మరియు ప్రతి విలువకు 100 జోడించండి. సెలెక్ట్() పద్ధతిలో ఈ ఫంక్షన్‌కి “పరిమాణం” నిలువు వరుసను పాస్ చేయండి.

# 100 జోడించండి

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

i+ని తిరిగి ఇవ్వండి 100

# పై ఫంక్షన్‌కి క్వాంటిటీ కాలమ్‌ని పాస్ చేసి డిస్‌ప్లే చేయండి.

market_df.select( 'పరిమాణం' ,జోడించు_100( 'పరిమాణం' )).షో()

అవుట్‌పుట్:

వివరణ:

UDF లోపల, మేము అన్ని విలువలను పునరావృతం చేస్తాము మరియు వాటిని సిరీస్‌గా మారుస్తాము. ఆ తర్వాత, మేము సిరీస్‌లోని ప్రతి విలువకు 100 జోడిస్తాము. చివరగా, మేము ఈ ఫంక్షన్‌కు “పరిమాణం” కాలమ్‌ను పాస్ చేస్తాము మరియు అన్ని విలువలకు 100 జోడించబడిందని మనం చూడవచ్చు.

Pandas_udf() గ్రూప్‌బై() & Agg()ని ఉపయోగించి వివిధ డేటా రకాలతో

UDFని సమగ్ర నిలువు వరుసలకు పాస్ చేయడానికి ఉదాహరణలను చూద్దాం. ఇక్కడ, కాలమ్ విలువలు మొదట groupby() ఫంక్షన్‌ని ఉపయోగించి సమూహం చేయబడతాయి మరియు agg() ఫంక్షన్‌ని ఉపయోగించి అగ్రిగేషన్ చేయబడుతుంది. మేము ఈ మొత్తం ఫంక్షన్‌లో మా UDFని పాస్ చేస్తాము.

సింటాక్స్:

pyspark_dataframe_object.groupby( 'గ్రూపింగ్_కాలమ్' ).agg(UDF
(pyspark_dataframe_object[ 'కాలమ్' ]))

ఇక్కడ, సమూహ నిలువు వరుసలోని విలువలు ముందుగా సమూహం చేయబడతాయి. ఆపై, మా UDFకి సంబంధించి ప్రతి సమూహ డేటాపై అగ్రిగేషన్ చేయబడుతుంది.

ఉదాహరణ 1: Pandas_udf() సగటు సగటు()తో

ఇక్కడ, మేము రిటర్న్ టైప్ ఫ్లోట్‌తో వినియోగదారు నిర్వచించిన ఫంక్షన్‌ని సృష్టిస్తాము. ఫంక్షన్ లోపల, మేము సగటు() ఫంక్షన్‌ని ఉపయోగించి సగటును గణిస్తాము. ప్రతి రకానికి సగటు పరిమాణాన్ని పొందడానికి ఈ UDF 'పరిమాణం' నిలువు వరుసకు పంపబడుతుంది.

# సగటు/సగటును తిరిగి ఇవ్వండి

@pandas_udf( 'ఫ్లోట్' )

డెఫ్ సగటు_ఫంక్షన్(i: పాండా.సిరీస్) -> ఫ్లోట్:

వాపసు i.mean()

# టైప్ కాలమ్‌ను సమూహపరచడం ద్వారా ఫంక్షన్‌కు క్వాంటిటీ కాలమ్‌ను పాస్ చేయండి.

market_df.groupby( 'రకం' ).agg(సగటు_ఫంక్షన్(మార్కెట్_డిఎఫ్[ 'పరిమాణం' ])).షో()

అవుట్‌పుట్:

మేము 'రకం' కాలమ్‌లోని మూలకాల ఆధారంగా సమూహాన్ని చేస్తున్నాము. రెండు సమూహాలు ఏర్పడతాయి - 'పండు' మరియు 'కూరగాయలు'. ప్రతి సమూహానికి, సగటు లెక్కించబడుతుంది మరియు తిరిగి ఇవ్వబడుతుంది.

ఉదాహరణ 2: Pandas_udf() మొత్తం గరిష్టం() మరియు Min()

ఇక్కడ, మేము పూర్ణాంకం (int) రిటర్న్ రకంతో రెండు వినియోగదారు నిర్వచించిన ఫంక్షన్‌లను సృష్టిస్తాము. మొదటి UDF కనిష్ట విలువను మరియు రెండవ UDF గరిష్ట విలువను అందిస్తుంది.

కనిష్ట విలువను అందించే # pandas_udf

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

i.min()ని తిరిగి ఇవ్వండి

గరిష్ట విలువను అందించే # pandas_udf

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

i.max()ని తిరిగి ఇవ్వండి

# లొకేట్_కంట్రీని సమూహపరచడం ద్వారా పరిమాణ నిలువు వరుసను min_ pandas_udfకి పంపండి.

market_df.groupby( 'locate_country' ).agg(min_(market_df[ 'పరిమాణం' ])).షో()

# లొకేట్_కంట్రీని గ్రూపింగ్ చేయడం ద్వారా పరిమాణ కాలమ్‌ను గరిష్టంగా_పాండస్_యుడిఎఫ్‌కి పాస్ చేయండి.

market_df.groupby( 'locate_country' ).agg(max_(market_df[ 'పరిమాణం' ])).షో()

అవుట్‌పుట్:

కనిష్ట మరియు గరిష్ట విలువలను అందించడానికి, మేము UDFల రిటర్న్ రకంలో min() మరియు max() ఫంక్షన్‌లను ఉపయోగిస్తాము. ఇప్పుడు, మేము డేటాను “locate_country” నిలువు వరుసలో సమూహపరుస్తాము. నాలుగు సమూహాలు ఏర్పడ్డాయి ('చైనా', 'భారతదేశం', 'జపాన్', 'USA'). ప్రతి సమూహానికి, మేము గరిష్ట పరిమాణాన్ని తిరిగి ఇస్తాము. అదేవిధంగా, మేము కనీస పరిమాణాన్ని తిరిగి ఇస్తాము.

ముగింపు

ప్రాథమికంగా, మన PySpark DataFrameలో వెక్టరైజ్డ్ ఆపరేషన్‌లను నిర్వహించడానికి pandas_udf () ఉపయోగించబడుతుంది. మేము pandas_udf()ని ఎలా సృష్టించాలో మరియు దానిని PySpark DataFrameకి ఎలా వర్తింపజేయాలో చూశాము. మెరుగైన అవగాహన కోసం, మేము అన్ని డేటాటైప్‌లను (స్ట్రింగ్, ఫ్లోట్ మరియు పూర్ణాంకం) పరిగణనలోకి తీసుకోవడం ద్వారా విభిన్న ఉదాహరణలను చర్చించాము. agg() ఫంక్షన్ ద్వారా groupby()తో pandas_udf()ని ఉపయోగించడం సాధ్యమవుతుంది.