GNU/Linux >> Tutoriales Linux >  >> Ubuntu

Guía de Spark Streaming para principiantes

Introducción

Spark Streaming es una adición a Spark API para transmisión en vivo y procesamiento de datos a gran escala. En lugar de lidiar con cantidades masivas de datos en bruto no estructurados y limpiarlos después, Spark Streaming realiza el procesamiento y la recopilación de datos casi en tiempo real.

Este artículo explica qué es Spark Streaming, cómo funciona y proporciona un caso práctico de ejemplo de transmisión de datos.

Requisitos previos

  • Apache Spark instalado y configurado (Siga nuestras guías:Cómo instalar Spark en Ubuntu, Cómo instalar Spark en Windows 10)
  • Configuración del entorno para Spark (usaremos Pyspark en cuadernos Jupyter).
  • Flujo de datos (usaremos la API de Twitter).
  • Bibliotecas de Python tweepy , json y enchufe para transmitir datos desde Twitter (use pip para instalarlos).

¿Qué es Spark Streaming?

Spark Streaming es una biblioteca de Spark para procesar flujos de datos casi continuos. La abstracción central es un flujo discretizado creado por la API Spark DStream para dividir los datos en lotes. La API de DStream está impulsada por Spark RDD (Conjuntos de datos distribuidos resistentes), lo que permite una integración perfecta con otros módulos de Apache Spark como Spark SQL y MLlib.

Las empresas aprovechan el poder de Spark Streaming en muchos casos de uso diferentes:

  • Transmisión en vivo de ETL – Limpieza y combinación de datos antes del almacenamiento.
  • Aprendizaje continuo – Actualizar constantemente los modelos de aprendizaje automático con nueva información.
  • Activación en eventos – Detección de anomalías en tiempo real.
  • Datos enriquecidos – Adición de información estadística a los datos antes del almacenamiento.
  • Sesiones complejas en vivo – Agrupación de la actividad de los usuarios para su análisis.

El enfoque de transmisión permite un análisis más rápido del comportamiento del cliente, sistemas de recomendación más rápidos y detección de fraude en tiempo real. Para los ingenieros, cualquier tipo de anomalía del sensor de los dispositivos IoT es visible a medida que se recopilan los datos.

Aspectos de Spark Streaming

Spark Streaming admite cargas de trabajo por lotes y de transmisión de forma nativa, lo que proporciona mejoras interesantes a las fuentes de datos. Este aspecto único satisface los siguientes requisitos de los sistemas modernos de transmisión de datos:

  • Equilibrio de carga dinámico. Dado que los datos se dividen en microlotes, los cuellos de botella ya no son un problema. La arquitectura tradicional procesa un registro a la vez, y una vez que aparece una partición computacionalmente intensa, bloquea todos los demás datos en ese nodo. Con Spark Streaming, las tareas se dividen entre los trabajadores, algunos procesan más tiempo y otros procesan tareas más cortas según los recursos disponibles.
  • Recuperación de errores. Las tareas fallidas en un nodo se discretizan y distribuyen entre otros trabajadores. Mientras los nodos trabajadores realizan cálculos, el rezagado tiene tiempo para recuperarse.
  • Análisis interactivo. Los DStreams son una serie de RDD. Los lotes de transmisión de datos almacenados en la memoria del trabajador consultan de forma interactiva.
  • Análisis avanzado. Los RDD generados por DStreams se convierten en DataFrames que consultan con SQL y se extienden a bibliotecas, como MLlib, para crear modelos de aprendizaje automático y aplicarlos a la transmisión de datos.
  • Rendimiento de transmisión mejorado. La transmisión por lotes aumenta el rendimiento, aprovechando latencias tan bajas como unos pocos cientos de milisegundos.

Ventajas y desventajas de Spark Streaming

Todas las tecnologías, incluido Spark Streaming, tienen ventajas y desventajas:

¿Cómo funciona Spark Streaming?

Spark Streaming se ocupa de análisis complejos y a gran escala casi en tiempo real. La canalización de procesamiento de transmisión distribuida pasa por tres pasos:

1. Recibir transmisión de datos desde fuentes de transmisión en vivo.

2. Proceso los datos en un clúster en paralelo.

3. Salida los datos procesados ​​en los sistemas.

Arquitectura de transmisión de Spark

La arquitectura central de Spark Streaming está en la transmisión discreta de lotes. En lugar de pasar por la canalización de procesamiento de flujo de un registro a la vez, los microlotes se asignan y procesan dinámicamente. Por lo tanto, los datos se transfieren a los trabajadores en función de los recursos disponibles y la localidad.

Cuando llegan los datos, el receptor los divide en particiones de RDD. La conversión a RDD permite procesar lotes con códigos y bibliotecas de Spark, ya que los RDD son una abstracción fundamental de los conjuntos de datos de Spark.

Fuentes de transmisión de Spark

Los flujos de datos requieren datos recibidos de las fuentes. Spark streaming divide estas fuentes en dos categorías:

  • Fuentes básicas. Las fuentes directamente disponibles en la API principal de Streaming, como las conexiones de socket y los sistemas de archivos compatibles con HDFS
  • Fuentes avanzadas. Las fuentes requieren vinculación de dependencias y no están disponibles en la API principal de Streaming, como Kafka o Kinesis.

Cada entrada DStream se conecta a un receptor. Para flujos de datos paralelos, la creación de múltiples DStreams también genera múltiples receptores.

Operaciones de transmisión de Spark

Spark Streaming incluye la ejecución de diferentes tipos de operaciones:

1. Operaciones de transformación modificar los datos recibidos de los DStreams de entrada, similares a los aplicados a los RDD. Las operaciones de transformación se evalúan con pereza y no se ejecutan hasta que los datos llegan a la salida.

2. Operaciones de salida envíe los DStreams a sistemas externos, como bases de datos o sistemas de archivos. Pasar a sistemas externos activa las operaciones de transformación.

3. Operaciones de DataFrame y SQL ocurren al convertir RDD en DataFrames y registrarlos como tablas temporales para realizar consultas.

4. Operaciones MLlib se utilizan para realizar algoritmos de aprendizaje automático, incluidos:

  • Algoritmos de transmisión se aplican a datos en vivo, como transmisión de regresión lineal o transmisión de k-means.
  • Algoritmos fuera de línea para aprender un modelo fuera de línea con datos históricos y aplicar el algoritmo para transmitir datos en línea.

Ejemplo de transmisión de Spark

El ejemplo de transmisión tiene la siguiente estructura:

La arquitectura se divide en dos partes y se ejecuta desde dos archivos:

  • Ejecutar el primer archivo para establecer una conexión con la API de Twitter y crear un socket entre la API de Twitter y Spark. Mantenga el archivo en ejecución.
  • Ejecutar el segundo archivo para solicitar y comenzar a transmitir los datos, imprimiendo Tweets procesados ​​a la consola. Los datos enviados sin procesar se imprimen en el primer archivo.

Crear un objeto de escucha de Twitter

El TweetListener El objeto escucha los Tweets de la transmisión de Twitter con el StreamListener de tweepy . Cuando se realiza una solicitud en el socket al servidor (local), el TweetListener escucha los datos y extrae la información del Tweet (el texto del Tweet). Si el objeto Tweet extendido está disponible, TweetListener obtiene el extended campo, de lo contrario el texto se obtiene el campo. Por último, el oyente agrega __end al final de cada Tweet. Este paso posterior nos ayuda a filtrar el flujo de datos en Spark.

import tweepy
import json
from tweepy.streaming import StreamListener
class TweetListener(StreamListener):
  # tweet object listens for the tweets
    def __init__(self, csocket):
        self.client_socket = csocket
    def on_data(self, data):
        try:  
            # Load data
            msg = json.loads(data)
            # Read extended Tweet if available
            if "extended_tweet" in msg:
                # Add "__end" at the end of each Tweet 
                self.client_socket\
                    .send(str(msg['extended_tweet']['full_text']+" __end")\
                    .encode('utf-8'))         
                print(msg['extended_tweet']['full_text'])
            # Else read Tweet text
            else:
                # Add "__end" at the end of each Tweet
                self.client_socket\
                    .send(str(msg['text']+"__end")\
                    .encode('utf-8'))
                print(msg['text'])
            return True
        except BaseException as e:
            print("error on_data: %s" % str(e))
        return True
    def on_error(self, status):
        print(status)
        return True

Si ocurre algún error en la conexión, la consola imprime la información.

Recopilar credenciales de desarrollador de Twitter

El portal para desarrolladores de Twitter contiene las credenciales de OAuth para establecer una conexión API con Twitter. La información está en la aplicación Claves y tokens pestaña.

Para recopilar los datos:

1. Genere la clave y secreto API ubicado en las Claves de consumidor del proyecto y guarde la información:

Las Claves de consumo verifica en Twitter tu identidad, como un nombre de usuario.

2. Genere el token de acceso y secreto de los tokens de autenticación y guarde la información:

Los tokens de autenticación permitir extraer datos específicos de Twitter.

Enviar datos desde la API de Twitter al socket

Con las credenciales de desarrollador, complete la API_KEY , API_SECRETO , ACCESO_TOKEN y ACCESO_SECRETO para acceder a la API de Twitter.

La función enviarDatos ejecuta la transmisión de Twitter cuando un cliente realiza una solicitud. Primero se verifica la solicitud de transmisión, luego se crea un objeto de escucha y los filtros de datos de transmisión se basan en la palabra clave y el idioma.

Por ejemplo:

from tweepy import Stream
from tweepy import OAuthHandler
API_KEY = "api_key"
API_SECRET = "api_secret"
ACCESS_TOKEN = "access_token"
ACCESS_SECRET = "access_secret"
def sendData(c_socket, keyword):
    print("Start sending data from Twitter to socket")
    # Authentication based on the developer credentials from twitter
    auth = OAuthHandler(API_KEY, API_SECRET)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
    # Send data from the Stream API
    twitter_stream = Stream(auth, TweetListener(c_socket))
    # Filter by keyword and language
    twitter_stream.filter(track = keyword, languages=["en"])

Crear un socket TCP de escucha en el servidor

La última parte del primer archivo incluye la creación de un socket de escucha en un servidor local. La dirección y el puerto están vinculados y escuchan las conexiones del cliente Spark.

Por ejemplo:

import socket
if __name__ == "__main__":
    # Create listening socket on server (local)
    s = socket.socket()
    # Host address and port
    host = "127.0.0.1"
    port = 5555
    s.bind((host, port))
    print("Socket is established")
    # Server listens for connections
    s.listen(4)
    print("Socket is listening")
    # Return the socket and the address of the client
    c_socket, addr = s.accept()
    print("Received request from: " + str(addr))
    # Send data to client via socket for selected keyword
    sendData(c_socket, keyword = ['covid'])

Una vez que el cliente Spark realiza una solicitud, el socket y la dirección del cliente se imprimen en la consola. Luego, el flujo de datos se envía al cliente en función del filtro de palabras clave seleccionado.
Este paso concluye el código en el primer archivo. Ejecutarlo imprime la siguiente información:

Mantenga el archivo ejecutándose y proceda a crear un cliente Spark.

Crear un receptor Spark DStream

En otro archivo, cree un contexto de Spark y un contexto de transmisión local con intervalos de lote de un segundo. El cliente lee desde el nombre de host y el socket del puerto.

import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(appName="tweetStream")
# Create a local StreamingContext with batch interval of 1 second
ssc = StreamingContext(sc, 1)
# Create a DStream that conencts to hostname:port
lines = ssc.socketTextStream("127.0.0.1", 5555)

Preprocesar datos

El preprocesamiento de RDD incluye dividir las líneas de datos recibidas donde __end aparece y pasa el texto a minúsculas. Los primeros diez elementos se imprimen en la consola.

# Split Tweets
words = lines.flatMap(lambda s: s.lower().split("__end"))
# Print the first ten elements of each DStream RDD to the console
words.pprint()

Después de ejecutar el código, no sucede nada ya que la evaluación es perezosa. El cálculo comienza cuando comienza el contexto de transmisión.

Comenzar a transmitir contexto y computación

Iniciar el contexto de transmisión envía una solicitud al host. El host envía los datos recopilados de Twitter al cliente Spark y el cliente procesa previamente los datos. La consola luego imprime el resultado.

# Start computing
ssc.start()        
# Wait for termination
ssc.awaitTermination()

Iniciar el contexto de transmisión imprime en el primer archivo una solicitud recibida y transmite el texto de datos sin procesar:

El segundo archivo lee los datos cada segundo desde el socket y se aplica el preprocesamiento a los datos. El primer par de líneas están vacías hasta que se establece la conexión:

El contexto de transmisión está listo para finalizar en cualquier momento.


Ubuntu
  1. Una guía de la terminal de Linux para principiantes

  2. Cómo instalar MongoDB en Ubuntu 18.04 – Guía para principiantes

  3. Cron Job:una guía completa para principiantes 2022

  4. Guía para principiantes de rutas múltiples de Device Mapper (DM)

  5. Guía para principiantes de SELinux

Una guía para principiantes de LVM

Configurar una IP estática en Ubuntu:una guía para principiantes

Una guía para principiantes para entender sudo en Ubuntu

Tutorial de secuencias de comandos de Bash para principiantes

Una guía para principiantes sobre los trabajos de Cron

Tutorial de Bash Heredoc para principiantes

    Ventajas Desventajas
    Rendimiento de velocidad excepcional para tareas complejas Gran consumo de memoria
    Tolerancia a fallos Difícil de usar, depurar y aprender
    Implementación sencilla en clústeres en la nube No está bien documentado y los recursos de aprendizaje son escasos
    Soporte multilenguaje Visualización de datos deficiente
    Integración para marcos de big data como Cassandra y MongoDB Lento con pequeñas cantidades de datos
    Capacidad de unirse a múltiples tipos de bases de datos Pocos algoritmos de aprendizaje automático