Pyspark.sql.DataFrameWriter.saveAsTable()
ముందుగా, రైట్.సేవ్ఏస్టేబుల్() ఫంక్షన్ని ఉపయోగించి ఇప్పటికే ఉన్న పైస్పార్క్ డేటాఫ్రేమ్ను టేబుల్లోకి ఎలా వ్రాయాలో చూద్దాం. డేటాఫ్రేమ్ను టేబుల్కి వ్రాయడానికి ఇది టేబుల్ పేరు మరియు మోడ్లు, partionBy మొదలైన ఇతర ఐచ్ఛిక పారామితులను తీసుకుంటుంది. ఇది పారేకెట్ ఫైల్గా నిల్వ చేయబడుతుంది.
సింటాక్స్:
dataframe_obj.write.saveAsTable(మార్గం/టేబుల్_పేరు, మోడ్, విభజన ద్వారా,...)
- Table_name అనేది dataframe_obj నుండి సృష్టించబడిన పట్టిక పేరు.
- మేము మోడ్ పరామితిని ఉపయోగించి పట్టిక యొక్క డేటాను జోడించవచ్చు/ఓవర్రైట్ చేయవచ్చు.
- ఈ అందించిన నిలువు వరుసలలోని విలువల ఆధారంగా విభజనలను సృష్టించడానికి partitionBy సింగిల్/మల్టిపుల్ నిలువు వరుసలను తీసుకుంటుంది.
ఉదాహరణ 1:
5 అడ్డు వరుసలు మరియు 4 నిలువు వరుసలతో PySpark DataFrameని సృష్టించండి. ఈ డేటాఫ్రేమ్ను “Agri_Table1” అనే పట్టికకు వ్రాయండి.
పైస్పార్క్ దిగుమతి
pyspark.sql దిగుమతి SparkSession నుండి
linuxhint_spark_app = SparkSession.builder.appName( 'Linux సూచన' ).getOrCreate()
5 అడ్డు వరుసలు మరియు 5 నిలువు వరుసలతో # వ్యవసాయ డేటా
అగ్రి =[{ 'నేల_రకం' : 'నలుపు' , 'నీటిపారుదల_లభ్యత' : 'లేదు' , 'ఎకరాలు' : 2500 , 'నేల_స్థితి' : 'పొడి' ,
'దేశం' : 'USA' },
{ 'నేల_రకం' : 'నలుపు' , 'నీటిపారుదల_లభ్యత' : 'అవును' , 'ఎకరాలు' : 3500 , 'నేల_స్థితి' : 'తడి' ,
'దేశం' : 'భారతదేశం' },
{ 'నేల_రకం' : 'ఎరుపు' , 'నీటిపారుదల_లభ్యత' : 'అవును' , 'ఎకరాలు' : 210 , 'నేల_స్థితి' : 'పొడి' ,
'దేశం' : 'UK' },
{ 'నేల_రకం' : 'ఇతర' , 'నీటిపారుదల_లభ్యత' : 'లేదు' , 'ఎకరాలు' : 1000 , 'నేల_స్థితి' : 'తడి' ,
'దేశం' : 'USA' },
{ 'నేల_రకం' : 'ఇసుక' , 'నీటిపారుదల_లభ్యత' : 'లేదు' , 'ఎకరాలు' : 500 , 'నేల_స్థితి' : 'పొడి' ,
'దేశం' : 'భారతదేశం' }]
# పై డేటా నుండి డేటాఫ్రేమ్ను సృష్టించండి
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# పై డేటా ఫ్రేమ్ని టేబుల్కి వ్రాయండి.
agri_df.coalesce( 1 ).write.saveAsTable( 'అగ్రి_టేబుల్1' )
అవుట్పుట్:
మునుపటి PySpark డేటాతో ఒక parquet ఫైల్ సృష్టించబడిందని మనం చూడవచ్చు.
ఉదాహరణ 2:
మునుపటి డేటాఫ్రేమ్ను పరిగణించండి మరియు 'దేశం' కాలమ్లోని విలువల ఆధారంగా రికార్డులను విభజించడం ద్వారా పట్టికలో 'Agri_Table2'ని వ్రాయండి.
# పైన పేర్కొన్న డేటాఫ్రేమ్ను పార్టిషన్ బై పారామీటర్తో టేబుల్కి వ్రాయండిagri_df.write.saveAsTable( 'అగ్రి_టేబుల్2' ,విభజనద్వారా=[ 'దేశం' ])
అవుట్పుట్:
'దేశం' కాలమ్లో మూడు ప్రత్యేక విలువలు ఉన్నాయి - 'భారతదేశం', 'UK' మరియు 'USA'. కాబట్టి, మూడు విభజనలు సృష్టించబడతాయి. ప్రతి విభజన పారేకెట్ ఫైళ్లను కలిగి ఉంటుంది.
Pyspark.sql.DataFrameReader.table()
spark.read.table() ఫంక్షన్ని ఉపయోగించి పట్టికను PySpark DataFrameలోకి లోడ్ చేద్దాం. ఇది పాత్/టేబుల్ పేరు అనే ఒక పరామితిని మాత్రమే తీసుకుంటుంది. ఇది నేరుగా టేబుల్ను PySpark DataFrameలోకి లోడ్ చేస్తుంది మరియు PySpark DataFrameకి వర్తించే అన్ని SQL ఫంక్షన్లను కూడా ఈ లోడ్ చేయబడిన డేటాఫ్రేమ్లో వర్తింపజేయవచ్చు.
సింటాక్స్:
spark_app.read.table(మార్గం/'టేబుల్_పేరు')ఈ దృష్టాంతంలో, మేము PySpark DataFrame నుండి సృష్టించబడిన మునుపటి పట్టికను ఉపయోగిస్తాము. మీరు మీ వాతావరణంలో మునుపటి దృష్టాంత కోడ్ స్నిప్పెట్లను అమలు చేయాలని నిర్ధారించుకోండి.
ఉదాహరణ:
“loaded_data” పేరుతో ఉన్న డేటాఫ్రేమ్లో “Agri_Table1” పట్టికను లోడ్ చేయండి.
loaded_data = linuxhint_spark_app.read.table( 'అగ్రి_టేబుల్1' )loaded_data.show()
అవుట్పుట్:
PySpark DataFrameలో పట్టిక లోడ్ చేయబడిందని మనం చూడవచ్చు.
SQL ప్రశ్నలను అమలు చేస్తోంది
ఇప్పుడు, మేము spark.sql() ఫంక్షన్ని ఉపయోగించి లోడ్ చేయబడిన డేటాఫ్రేమ్లో కొన్ని SQL ప్రశ్నలను అమలు చేస్తాము.
# పై పట్టిక నుండి అన్ని నిలువు వరుసలను ప్రదర్శించడానికి SELECT ఆదేశాన్ని ఉపయోగించండి.linuxhint_spark_app.sql( 'అగ్రి_టేబుల్1 నుండి * ఎంచుకోండి' ).show()
# ఎక్కడ నిబంధన
linuxhint_spark_app.sql( 'అగ్రి_టేబుల్1 నుండి *ని ఎంచుకోండి ఎక్కడ నేల_స్థితి='పొడి' ' ).show()
linuxhint_spark_app.sql( 'ఎకరాలు > 2000 ఎక్కడ అగ్రి_టేబుల్1 నుండి * ఎంచుకోండి' ).show()
అవుట్పుట్:
- మొదటి ప్రశ్న డేటాఫ్రేమ్ నుండి అన్ని నిలువు వరుసలు మరియు రికార్డులను ప్రదర్శిస్తుంది.
- రెండవ ప్రశ్న 'Soil_status' నిలువు వరుస ఆధారంగా రికార్డులను ప్రదర్శిస్తుంది. 'డ్రై' మూలకంతో మూడు రికార్డులు మాత్రమే ఉన్నాయి.
- చివరి ప్రశ్న 2000 కంటే ఎక్కువ 'ఎకరాలు'తో రెండు రికార్డ్లను అందిస్తుంది.
Pyspark.sql.DataFrameWriter.insertInto()
insertInto() ఫంక్షన్ని ఉపయోగించి, మేము ఇప్పటికే ఉన్న పట్టికలో డేటాఫ్రేమ్ను జోడించవచ్చు. కాలమ్ పేర్లను నిర్వచించడానికి, ఆపై దానిని టేబుల్లోకి చొప్పించడానికి మేము selectExpr()తో పాటు ఈ ఫంక్షన్ను ఉపయోగించవచ్చు. ఈ ఫంక్షన్ టేబుల్నేమ్ను కూడా పారామీటర్గా తీసుకుంటుంది.
సింటాక్స్:
DataFrame_obj.write.insertInto('టేబుల్_పేరు')ఈ దృష్టాంతంలో, మేము PySpark DataFrame నుండి సృష్టించబడిన మునుపటి పట్టికను ఉపయోగిస్తాము. మీరు మీ వాతావరణంలో మునుపటి దృష్టాంత కోడ్ స్నిప్పెట్లను అమలు చేయాలని నిర్ధారించుకోండి.
ఉదాహరణ:
రెండు రికార్డ్లతో కొత్త డేటాఫ్రేమ్ని సృష్టించండి మరియు వాటిని “Agri_Table1” పట్టికలో చొప్పించండి.
పైస్పార్క్ దిగుమతిpyspark.sql దిగుమతి SparkSession నుండి
linuxhint_spark_app = SparkSession.builder.appName( 'Linux సూచన' ).getOrCreate()
2 అడ్డు వరుసలతో # వ్యవసాయ డేటా
అగ్రి =[{ 'నేల_రకం' : 'ఇసుక' , 'నీటిపారుదల_లభ్యత' : 'లేదు' , 'ఎకరాలు' : 2500 , 'నేల_స్థితి' : 'పొడి' ,
'దేశం' : 'USA' },
{ 'నేల_రకం' : 'ఇసుక' , 'నీటిపారుదల_లభ్యత' : 'లేదు' , 'ఎకరాలు' : 1200 , 'నేల_స్థితి' : 'తడి' ,
'దేశం' : 'జపాన్' }]
# పై డేటా నుండి డేటాఫ్రేమ్ను సృష్టించండి
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'ఎకరాలు' , 'దేశం' , 'నీటిపారుదల_లభ్యత' , 'నేల_రకం' ,
'నేల_స్థితి' ).write.insertInto( 'అగ్రి_టేబుల్1' )
# చివరి అగ్రి_టేబుల్ 1ని ప్రదర్శించండి
linuxhint_spark_app.sql( 'అగ్రి_టేబుల్1 నుండి * ఎంచుకోండి' ).show()
అవుట్పుట్:
ఇప్పుడు, డేటాఫ్రేమ్లో ఉన్న మొత్తం అడ్డు వరుసల సంఖ్య 7.
ముగింపు
మీరు ఇప్పుడు write.saveAsTable() ఫంక్షన్ని ఉపయోగించి PySpark DataFrameని టేబుల్కి ఎలా వ్రాయాలో అర్థం చేసుకున్నారు. ఇది పట్టిక పేరు మరియు ఇతర ఐచ్ఛిక పారామితులను తీసుకుంటుంది. అప్పుడు, మేము ఈ పట్టికను spark.read.table() ఫంక్షన్ని ఉపయోగించి PySpark DataFrameలోకి లోడ్ చేసాము. ఇది పాత్/టేబుల్ పేరు అనే ఒక పరామితిని మాత్రమే తీసుకుంటుంది. మీరు ఇప్పటికే ఉన్న పట్టికలో కొత్త డేటాఫ్రేమ్ను జోడించాలనుకుంటే, ఇన్సర్ట్ఇంటో() ఫంక్షన్ని ఉపయోగించండి.