logo

PySpark SQL

Apache Spark es el software más exitoso de Apache Software Foundation y está diseñado para computación rápida. Varias industrias están utilizando Apache Spark para encontrar sus soluciones. PySpark SQL es un módulo de Spark que integra el procesamiento relacional con la API de programación funcional de Spark. Podemos extraer los datos utilizando un lenguaje de consulta SQL. Podemos utilizar las consultas igual que el lenguaje SQL.

Si tiene conocimientos básicos de RDBMS, PySpark SQL será fácil de usar y podrá ampliar las limitaciones del procesamiento de datos relacionales tradicional. Spark también admite el lenguaje de consulta Hive, pero existen limitaciones en la base de datos de Hive. Spark SQL fue desarrollado para eliminar los inconvenientes de la base de datos de Hive. Echemos un vistazo a los siguientes inconvenientes de Hive:

Desventajas de la colmena

  • No puede reanudar el procesamiento, lo que significa que si la ejecución falla en medio de un flujo de trabajo, no puede continuar desde donde se quedó atascado.
  • No podemos eliminar las bases de datos cifradas en cascada cuando la papelera está habilitada. Conduce al error de ejecución. Para eliminar este tipo de base de datos, los usuarios deben utilizar la opción Purgar.
  • Las consultas ad-hoc se ejecutan utilizando MapReduce, que es iniciado por Hive pero cuando analizamos la base de datos de tamaño mediano, retrasa el rendimiento.
  • Hive no admite la operación de actualización o eliminación.
  • Se limita al soporte de subconsultas.

Estos inconvenientes son las razones para desarrollar Apache SQL.

Breve introducción a PySpark SQL

PySpark admite el procesamiento relacional integrado con la programación funcional de Spark. Proporciona soporte para diversas fuentes de datos y permite entrelazar consultas SQL con transformaciones de código, lo que resulta en una herramienta muy poderosa.

PySpark SQL establece la conexión entre el RDD y la tabla relacional. Proporciona una integración mucho más estrecha entre el procesamiento relacional y procedimental a través de la API declarativa de Dataframe, que está integrada con el código Spark.

Al usar SQL, puede ser fácilmente accesible para más usuarios y mejorar la optimización para los actuales. También admite una amplia gama de fuentes de datos y algoritmos en Big-data.

Característica de PySpark SQL

Las características de PySpark SQL se detallan a continuación:

1) Acceso a datos coherentes

Proporciona acceso constante a los datos, lo que significa que SQL admite una forma compartida de acceder a una variedad de fuentes de datos como Colmena, Avro, Parquet, JSON y JDBC. Desempeña un papel importante a la hora de acomodar a todos los usuarios existentes en Spark SQL.

2) Incorporación con Spark

Las consultas SQL de PySpark están integradas con los programas Spark. Podemos utilizar las consultas dentro de los programas Spark.

Una de sus principales ventajas es que los desarrolladores no tienen que gestionar manualmente los fallos de estado ni mantener la aplicación sincronizada con los trabajos por lotes.

3) Conectividad estándar

Proporciona una conexión a través de JDBC u ODBC, y estos dos son los estándares de la industria para la conectividad de herramientas de inteligencia empresarial.

4) Funciones definidas por el usuario

PySpark SQL tiene una función definida por el usuario (UDF) combinada en lenguaje. UDF se utiliza para definir una nueva función basada en columnas que amplía el vocabulario de DSL de Spark SQL para transformar DataFrame.

matriz en cadena

5) Compatibilidad de la colmena

PySpark SQL ejecuta consultas de Hive sin modificar sobre los datos actuales. Permite compatibilidad total con los datos actuales de Hive.

Módulo SQL de PySpark

Algunas clases importantes de Spark SQL y DataFrames son las siguientes:

    pyspark.sql.SparkSession:Representa el principal punto de entrada para Marco de datos y funcionalidad SQL.pyspark.sql.DataFrame:Representa una colección distribuida de datos agrupados en columnas con nombre.pyspark.sql.Columna:Representa una expresión de columna en un Marco de datos. pyspark.sql.Fila:Representa una fila de datos en un Marco de datos. pyspark.sql.GroupedData:Métodos de agregación, devueltos por Marco de datos.groupBy(). pyspark.sql.DataFrameNaFunciones:Representa métodos para manejar datos faltantes (valores nulos).Funciones de pyspark.sql.DataFrameStat:Representa métodos para la funcionalidad estadística.pysark.sql.funciones:Representa una lista de funciones integradas disponibles para Marco de datos. pyspark.sql.tipos:Representa una lista de tipos de datos disponibles.pyspark.sql.Ventana:Se utiliza para trabajar con funciones de Windows.

Considere el siguiente ejemplo de PySpark SQL.

 import findspark findspark.init() import pyspark # only run after findspark.init() from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = spark.sql('''select 'spark' as hello ''') df.show() 

Producción:

 +-----+ |hello| +-----+ |spark| +-----+ 

Explicación del código:

En el código anterior, hemos importado el encontrar chispa módulo y llamado encontrarchispa.init() constructor; luego, importamos el módulo SparkSession para crear una sesión Spark.

desde pyspark.sql importar SparkSession

Se puede utilizar una sesión Spark para crear la API Dataset y DataFrame. También se puede utilizar una SparkSession para crear DataFrame, registrar DataFrame como una tabla, ejecutar SQL sobre tablas, almacenar en caché tablas y leer archivos parquet.

constructor de clases

Es un constructor de Spark Session.

obtenerOCrear()

Se utiliza para obtener una existente. sesión de chispa, o si no existe uno, cree uno nuevo según las opciones configuradas en el generador.

Algunos otros métodos

Algunos métodos de PySpark SQL son los siguientes:

1. nombre de la aplicación (nombre)

Se utiliza para establecer el nombre de la aplicación, que se mostrará en la interfaz de usuario web de Spark. El parámetro nombre acepta el nombre del parámetro.

2. config(clave=Ninguno, valor = Ninguno, conf = Ninguno)

Se utiliza para establecer una opción de configuración. Las opciones configuradas con este método se propagan automáticamente a ambos SparkConf y SparkSession La configuración.

 from pyspark.conf import SparkConfSparkSession.builder.config(conf=SparkConf()) 

Parámetros:

    llave-Una cadena de nombre de clave de una propiedad de configuración.valor-Representa el valor de una propiedad de configuración.conf -Una instancia de SparkConf.

3. maestro(maestro)

Establece la URL maestra de Spark a la que conectarse, como 'local' para ejecutarse localmente, 'local[4]' para ejecutarse localmente con 4 núcleos.

Parámetros:

    maestro:una URL para Spark Master.

4. SparkSession.catalog

Es una interfaz en la que el usuario puede crear, eliminar, alterar o consultar la base de datos, tablas, funciones, etc.

5. SparkSession.conf

Es una interfaz de configuración de tiempo de ejecución para Spark. Esta es la interfaz a través de la cual el usuario puede obtener y establecer todas las configuraciones de Spark y Hadoop que son relevantes para Spark SQL.

clase pyspark.sql.DataFrame

Es una colección distribuida de datos agrupados en columnas con nombre. Un DataFrame es similar a la tabla relacional en Spark SQL y se puede crear usando varias funciones en SQLContext.

 student = sqlContext.read.csv('...') 

Después de la creación del marco de datos, podemos manipularlo utilizando varios lenguajes específicos de dominio (DSL) que son funciones predefinidas de DataFrame. Considere el siguiente ejemplo.

 # To create DataFrame using SQLContext student = sqlContext.read.parquet('...') department = sqlContext.read.parquet('...') student.filter(marks > 55).join(department, student.student_Id == department.id)  .groupBy(student.name, 'gender').({'name': 'student_Id', 'mark': 'department'}) 

Consideremos el siguiente ejemplo:

Consultar usando Spark SQL

En el siguiente código, primero creamos un DataFrame y ejecutamos las consultas SQL para recuperar los datos. Considere el siguiente código:

 from pyspark.sql import * #Create DataFrame songdf = spark.read.csv(r'C:UsersDEVANSH SHARMA	op50.csv', inferSchema = True, header = True) #Perform SQL queries songdf.select('Genre').show() songdf.filter(songdf['Genre']=='pop').show() 

Producción:

 +----------------+ | Genre| +----------------+ | canadian pop| | reggaeton flow| | dance pop| | pop| | dfw rap| | pop| | trap music| | pop| | country rap| | electropop| | reggaeton| | dance pop| | pop| | panamanian pop| |canadian hip hop| | dance pop| | latin| | dfw rap| |canadian hip hop| | escape room| +----------------+ only showing top 20 rows +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name| Artist.Name|Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 6|I Don't Care (wit...| Ed Sheeran| pop| 102| 68| 80| -5| 9| 84| 220| 9| 4| 84| | 8| How Do You Sleep?| Sam Smith| pop| 111| 68| 48| -5| 8| 35| 202| 15| 9| 90| | 13| Someone You Loved|Lewis Capaldi| pop| 110| 41| 50| -6| 11| 45| 182| 75| 3| 88| | 38|Antisocial (with ...| Ed Sheeran| pop| 152| 82| 72| -5| 36| 91| 162| 13| 5| 87| | 44| Talk| Khalid| pop| 136| 40| 90| -9| 6| 35| 198| 5| 13| 84| | 50|Cross Me (feat. C...| Ed Sheeran| pop| 95| 79| 75| -6| 7| 61| 206| 21| 12| 82| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ 

Usando la función groupBy()

La función groupBy() recopila datos de categorías similares.

 songdf.groupBy('Genre').count().show() 

Producción:

 +----------------+-----+ | Genre|count| +----------------+-----+ | boy band| 1| | electropop| 2| | pop| 7| | brostep| 2| | big room| 1| | pop house| 1| | australian pop| 1| | edm| 3| | r&b en espanol| 1| | dance pop| 8| | reggaeton| 2| | canadian pop| 2| | trap music| 1| | escape room| 1| | reggaeton flow| 2| | panamanian pop| 2| | atl hip hop| 1| | country rap| 2| |canadian hip hop| 3| | dfw rap| 2| +----------------+-----+ 

distribución (numparticiones, *cols)

El distribución() devuelve un nuevo DataFrame que es una expresión de partición. Esta función acepta dos parámetros. numparticiones y *columna. El numparticiones El parámetro especifica el número objetivo de columnas.

 song_spotify.repartition(10).rdd.getNumPartitions() data = song_spotify.union(song_spotify).repartition('Energy') data.show(5) 

Producción:

 +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name|Artist.Name| Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| | 17| LA CANCI?N| J Balvin| latin| 176| 65| 75| -6| 11| 43| 243| 15| 32| 90| | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ only showing top 5 rows