PySparkలో టేబుల్ డేటాను చదవడం మరియు వ్రాయడం ఎలా

Pysparklo Tebul Detanu Cadavadam Mariyu Vrayadam Ela



టేబుల్ రూపంలో డేటా లోడ్ అయినట్లయితే PySparkలో డేటా ప్రాసెసింగ్ వేగంగా జరుగుతుంది. దీనితో, SQl వ్యక్తీకరణలను ఉపయోగించి, ప్రాసెసింగ్ త్వరగా జరుగుతుంది. కాబట్టి, PySpark DataFrame/RDDని ప్రాసెసింగ్ కోసం పంపే ముందు టేబుల్‌గా మార్చడం మంచి విధానం. ఈ రోజు, మేము PySpark DataFrameలో టేబుల్ డేటాను ఎలా చదవాలో, PySpark DataFrameని టేబుల్‌కి ఎలా వ్రాయాలో మరియు బిల్ట్-ఇన్ ఫంక్షన్‌లను ఉపయోగించి ఇప్పటికే ఉన్న టేబుల్‌కి కొత్త డేటాఫ్రేమ్‌ను ఎలా చొప్పించాలో చూద్దాం. వెళ్దాం!

Pyspark.sql.DataFrameWriter.saveAsTable()

ముందుగా, రైట్.సేవ్ఏస్టేబుల్() ఫంక్షన్‌ని ఉపయోగించి ఇప్పటికే ఉన్న పైస్పార్క్ డేటాఫ్రేమ్‌ను టేబుల్‌లోకి ఎలా వ్రాయాలో చూద్దాం. డేటాఫ్రేమ్‌ను టేబుల్‌కి వ్రాయడానికి ఇది టేబుల్ పేరు మరియు మోడ్‌లు, partionBy మొదలైన ఇతర ఐచ్ఛిక పారామితులను తీసుకుంటుంది. ఇది పారేకెట్ ఫైల్‌గా నిల్వ చేయబడుతుంది.

సింటాక్స్:







dataframe_obj.write.saveAsTable(మార్గం/టేబుల్_పేరు, మోడ్, విభజన ద్వారా,...)
  1. Table_name అనేది dataframe_obj నుండి సృష్టించబడిన పట్టిక పేరు.
  2. మేము మోడ్ పరామితిని ఉపయోగించి పట్టిక యొక్క డేటాను జోడించవచ్చు/ఓవర్‌రైట్ చేయవచ్చు.
  3. ఈ అందించిన నిలువు వరుసలలోని విలువల ఆధారంగా విభజనలను సృష్టించడానికి partitionBy సింగిల్/మల్టిపుల్ నిలువు వరుసలను తీసుకుంటుంది.

ఉదాహరణ 1:

5 అడ్డు వరుసలు మరియు 4 నిలువు వరుసలతో PySpark DataFrameని సృష్టించండి. ఈ డేటాఫ్రేమ్‌ను “Agri_Table1” అనే పట్టికకు వ్రాయండి.



పైస్పార్క్ దిగుమతి

pyspark.sql దిగుమతి SparkSession నుండి

linuxhint_spark_app = SparkSession.builder.appName( 'Linux సూచన' ).getOrCreate()

5 అడ్డు వరుసలు మరియు 5 నిలువు వరుసలతో # వ్యవసాయ డేటా

అగ్రి =[{ 'నేల_రకం' : 'నలుపు' , 'నీటిపారుదల_లభ్యత' : 'లేదు' , 'ఎకరాలు' : 2500 , 'నేల_స్థితి' : 'పొడి' ,
'దేశం' : 'USA' },

{ 'నేల_రకం' : 'నలుపు' , 'నీటిపారుదల_లభ్యత' : 'అవును' , 'ఎకరాలు' : 3500 , 'నేల_స్థితి' : 'తడి' ,
'దేశం' : 'భారతదేశం' },

{ 'నేల_రకం' : 'ఎరుపు' , 'నీటిపారుదల_లభ్యత' : 'అవును' , 'ఎకరాలు' : 210 , 'నేల_స్థితి' : 'పొడి' ,
'దేశం' : 'UK' },

{ 'నేల_రకం' : 'ఇతర' , 'నీటిపారుదల_లభ్యత' : 'లేదు' , 'ఎకరాలు' : 1000 , 'నేల_స్థితి' : 'తడి' ,
'దేశం' : 'USA' },

{ 'నేల_రకం' : 'ఇసుక' , 'నీటిపారుదల_లభ్యత' : 'లేదు' , 'ఎకరాలు' : 500 , 'నేల_స్థితి' : 'పొడి' ,
'దేశం' : 'భారతదేశం' }]



# పై డేటా నుండి డేటాఫ్రేమ్‌ను సృష్టించండి

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# పై డేటా ఫ్రేమ్‌ని టేబుల్‌కి వ్రాయండి.

agri_df.coalesce( 1 ).write.saveAsTable( 'అగ్రి_టేబుల్1' )

అవుట్‌పుట్:







మునుపటి PySpark డేటాతో ఒక parquet ఫైల్ సృష్టించబడిందని మనం చూడవచ్చు.



ఉదాహరణ 2:

మునుపటి డేటాఫ్రేమ్‌ను పరిగణించండి మరియు 'దేశం' కాలమ్‌లోని విలువల ఆధారంగా రికార్డులను విభజించడం ద్వారా పట్టికలో 'Agri_Table2'ని వ్రాయండి.

# పైన పేర్కొన్న డేటాఫ్రేమ్‌ను పార్టిషన్ బై పారామీటర్‌తో టేబుల్‌కి వ్రాయండి

agri_df.write.saveAsTable( 'అగ్రి_టేబుల్2' ,విభజనద్వారా=[ 'దేశం' ])

అవుట్‌పుట్:

'దేశం' కాలమ్‌లో మూడు ప్రత్యేక విలువలు ఉన్నాయి - 'భారతదేశం', 'UK' మరియు 'USA'. కాబట్టి, మూడు విభజనలు సృష్టించబడతాయి. ప్రతి విభజన పారేకెట్ ఫైళ్లను కలిగి ఉంటుంది.

Pyspark.sql.DataFrameReader.table()

spark.read.table() ఫంక్షన్‌ని ఉపయోగించి పట్టికను PySpark DataFrameలోకి లోడ్ చేద్దాం. ఇది పాత్/టేబుల్ పేరు అనే ఒక పరామితిని మాత్రమే తీసుకుంటుంది. ఇది నేరుగా టేబుల్‌ను PySpark DataFrameలోకి లోడ్ చేస్తుంది మరియు PySpark DataFrameకి వర్తించే అన్ని SQL ఫంక్షన్‌లను కూడా ఈ లోడ్ చేయబడిన డేటాఫ్రేమ్‌లో వర్తింపజేయవచ్చు.

సింటాక్స్:

spark_app.read.table(మార్గం/'టేబుల్_పేరు')

ఈ దృష్టాంతంలో, మేము PySpark DataFrame నుండి సృష్టించబడిన మునుపటి పట్టికను ఉపయోగిస్తాము. మీరు మీ వాతావరణంలో మునుపటి దృష్టాంత కోడ్ స్నిప్పెట్‌లను అమలు చేయాలని నిర్ధారించుకోండి.

ఉదాహరణ:

“loaded_data” పేరుతో ఉన్న డేటాఫ్రేమ్‌లో “Agri_Table1” పట్టికను లోడ్ చేయండి.

loaded_data = linuxhint_spark_app.read.table( 'అగ్రి_టేబుల్1' )

loaded_data.show()

అవుట్‌పుట్:

PySpark DataFrameలో పట్టిక లోడ్ చేయబడిందని మనం చూడవచ్చు.

SQL ప్రశ్నలను అమలు చేస్తోంది

ఇప్పుడు, మేము spark.sql() ఫంక్షన్‌ని ఉపయోగించి లోడ్ చేయబడిన డేటాఫ్రేమ్‌లో కొన్ని SQL ప్రశ్నలను అమలు చేస్తాము.

# పై పట్టిక నుండి అన్ని నిలువు వరుసలను ప్రదర్శించడానికి SELECT ఆదేశాన్ని ఉపయోగించండి.

linuxhint_spark_app.sql( 'అగ్రి_టేబుల్1 నుండి * ఎంచుకోండి' ).show()

# ఎక్కడ నిబంధన

linuxhint_spark_app.sql( 'అగ్రి_టేబుల్1 నుండి *ని ఎంచుకోండి ఎక్కడ నేల_స్థితి='పొడి' ' ).show()

linuxhint_spark_app.sql( 'ఎకరాలు > 2000 ఎక్కడ అగ్రి_టేబుల్1 నుండి * ఎంచుకోండి' ).show()

అవుట్‌పుట్:

  1. మొదటి ప్రశ్న డేటాఫ్రేమ్ నుండి అన్ని నిలువు వరుసలు మరియు రికార్డులను ప్రదర్శిస్తుంది.
  2. రెండవ ప్రశ్న 'Soil_status' నిలువు వరుస ఆధారంగా రికార్డులను ప్రదర్శిస్తుంది. 'డ్రై' మూలకంతో మూడు రికార్డులు మాత్రమే ఉన్నాయి.
  3. చివరి ప్రశ్న 2000 కంటే ఎక్కువ 'ఎకరాలు'తో రెండు రికార్డ్‌లను అందిస్తుంది.

Pyspark.sql.DataFrameWriter.insertInto()

insertInto() ఫంక్షన్‌ని ఉపయోగించి, మేము ఇప్పటికే ఉన్న పట్టికలో డేటాఫ్రేమ్‌ను జోడించవచ్చు. కాలమ్ పేర్లను నిర్వచించడానికి, ఆపై దానిని టేబుల్‌లోకి చొప్పించడానికి మేము selectExpr()తో పాటు ఈ ఫంక్షన్‌ను ఉపయోగించవచ్చు. ఈ ఫంక్షన్ టేబుల్‌నేమ్‌ను కూడా పారామీటర్‌గా తీసుకుంటుంది.

సింటాక్స్:

DataFrame_obj.write.insertInto('టేబుల్_పేరు')

ఈ దృష్టాంతంలో, మేము PySpark DataFrame నుండి సృష్టించబడిన మునుపటి పట్టికను ఉపయోగిస్తాము. మీరు మీ వాతావరణంలో మునుపటి దృష్టాంత కోడ్ స్నిప్పెట్‌లను అమలు చేయాలని నిర్ధారించుకోండి.

ఉదాహరణ:

రెండు రికార్డ్‌లతో కొత్త డేటాఫ్రేమ్‌ని సృష్టించండి మరియు వాటిని “Agri_Table1” పట్టికలో చొప్పించండి.

పైస్పార్క్ దిగుమతి

pyspark.sql దిగుమతి SparkSession నుండి

linuxhint_spark_app = SparkSession.builder.appName( 'Linux సూచన' ).getOrCreate()

2 అడ్డు వరుసలతో # వ్యవసాయ డేటా

అగ్రి =[{ 'నేల_రకం' : 'ఇసుక' , 'నీటిపారుదల_లభ్యత' : 'లేదు' , 'ఎకరాలు' : 2500 , 'నేల_స్థితి' : 'పొడి' ,
'దేశం' : 'USA' },

{ 'నేల_రకం' : 'ఇసుక' , 'నీటిపారుదల_లభ్యత' : 'లేదు' , 'ఎకరాలు' : 1200 , 'నేల_స్థితి' : 'తడి' ,
'దేశం' : 'జపాన్' }]

# పై డేటా నుండి డేటాఫ్రేమ్‌ను సృష్టించండి

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'ఎకరాలు' , 'దేశం' , 'నీటిపారుదల_లభ్యత' , 'నేల_రకం' ,
'నేల_స్థితి' ).write.insertInto( 'అగ్రి_టేబుల్1' )

# చివరి అగ్రి_టేబుల్ 1ని ప్రదర్శించండి

linuxhint_spark_app.sql( 'అగ్రి_టేబుల్1 నుండి * ఎంచుకోండి' ).show()

అవుట్‌పుట్:

ఇప్పుడు, డేటాఫ్రేమ్‌లో ఉన్న మొత్తం అడ్డు వరుసల సంఖ్య 7.

ముగింపు

మీరు ఇప్పుడు write.saveAsTable() ఫంక్షన్‌ని ఉపయోగించి PySpark DataFrameని టేబుల్‌కి ఎలా వ్రాయాలో అర్థం చేసుకున్నారు. ఇది పట్టిక పేరు మరియు ఇతర ఐచ్ఛిక పారామితులను తీసుకుంటుంది. అప్పుడు, మేము ఈ పట్టికను spark.read.table() ఫంక్షన్‌ని ఉపయోగించి PySpark DataFrameలోకి లోడ్ చేసాము. ఇది పాత్/టేబుల్ పేరు అనే ఒక పరామితిని మాత్రమే తీసుకుంటుంది. మీరు ఇప్పటికే ఉన్న పట్టికలో కొత్త డేటాఫ్రేమ్‌ను జోడించాలనుకుంటే, ఇన్‌సర్ట్‌ఇంటో() ఫంక్షన్‌ని ఉపయోగించండి.