Mūsdienās dati aug un krājas ātrāk nekā iepriekš. Pašlaik aptuveni 90% no mūsu pasaulē ģenerētajiem datiem ir iegūti pēdējos divos gados. Šī ātruma pieauguma dēļ platformas lielie dati viņiem bija jāpieņem radikāli risinājumi, lai varētu uzturēt tik lielu datu apjomu.
kurš no šiem ir portjē piecu spēku modeļa centrs, kas tiek apspriests lekcijā?
Viens no nozīmīgākajiem datu avotiem mūsdienās ir sociālie mediji. Ļaujiet man parādīt piemēru reālai dzīvei: reāllaikā pārvaldīt, analizēt un iegūt informāciju no sociālo mediju datiem, izmantojot vienu no eko risinājumiem lielie dati vissvarīgākie no tiem - Apache Spark un Python.
Šajā rakstā es parādīšu, kā izveidot vienkāršu lietotni, kas, izmantojot Python, lasa čivināt tiešsaistes plūsmas, pēc tam apstrādā tvītus, izmantojot Apache Spark straumēšana lai identificētu hashtagus un, visbeidzot, atgrieztu svarīgākos trendējošos hashtagus un reālā laikā padarītu šos datus informācijas panelī.
Lai saņemtu čivināt no Twitter, jums jāreģistrējas vietnē TwitterApps Noklikšķinot uz 'Izveidot jaunu pieteikumu' un pēc zemāk esošās veidlapas aizpildīšanas noklikšķiniet uz 'Izveidot savu Twitter lietojumprogrammu'.
Otrkārt, dodieties uz savu jaunizveidoto lietojumprogrammu un atveriet logu 'Piekļuves identifikatori un atslēgas'. Pēc tam noklikšķiniet uz 'Ģenerēt manu piekļuves identifikatoru'.
Jūsu jaunie pieteikšanās ID tiks parādīti, kā parādīts zemāk.
Un tagad jūs esat gatavs nākamajam solim.
Šajā solī es jums parādīšu, kā izveidot vienkāršu klientu, kurš, izmantojot Python, ielādēs čivināt no Twitter API un pēc tam tos nodos instancei Dzirksteļu straumēšana . Jebkuram jābūt viegli sekojamam pitona izstrādātājs profesionāls.
Pirmkārt, mēs izveidosim failu ar nosaukumu twitter_app.py
un tad mēs pievienosim kodu kopā, kā redzams zemāk.
kādi ir leņķa komponenti
Importējiet bibliotēkas, kuras izmantosim, kā redzams zemāk:
import socket import sys import requests import requests_oauthlib import json
Un pievienojiet mainīgos, kas tiks izmantoti OAuth, lai izveidotu savienojumu ar Twitter, kā redzams zemāk:
# Reemplaza los valores de abajo con los tuyos ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
Tagad izveidosim jaunu funkciju ar nosaukumu get_tweets
kas izsauks Twitter API URL un atgriezīs atbildi par virkni tvītu.
def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response
Tad jūs izveidojat funkciju, kas ņem atbildi no iepriekš redzamā skata un izraksta tweets tekstu no JSON objekta ar pilnu tweets. Pēc tam nosūtiet katru čivināt uz instanci Dzirksteļu straumēšana (tiks apspriests vēlāk), izmantojot TCP savienojumu.
def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print('Tweet Text: ' + tweet_text) print ('------------------------------------------') tcp_connection.send(tweet_text + '
') except: e = sys.exc_info()[0] print('Error: %s' % e)
Tagad mēs izdarīsim galveno daļu. Tas ļaus lietojumprogrammai mitināt savienojumus kontaktligzda , ar kuru tas vēlāk savienosies Dzirksts . Iestatīsim šeit IP vērtību localhost
tā kā viss darbosies vienā mašīnā un ostā 9009
. Pēc tam mēs izsauksim metodi get_tweets
, ko mēs izdarījām iepriekš, lai saņemtu čivināt no Twitter un nosūtītu jūsu atbildi ar savienojumu kontaktligzda a send_tweets_to_spark
nosūtīt tvītus uz Spark.
TCP_IP = 'localhost' TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print('Waiting for TCP connection...') conn, addr = s.accept() print('Connected... Starting getting tweets.') resp = get_tweets() send_tweets_to_spark(resp, conn)
Veidosim savu lietojumprogrammu Dzirksteļu straumēšana , kas reāllaikā apstrādās ienākošos tvītus, izvelk no tiem hashtagus un aprēķinās, cik hashtag ir pieminēts.
Pirmkārt, mums ir jāizveido instance Dzirksteles konteksts sc
, tad mēs izveidojam Straumēšanas konteksts ssc
no sc
ar divu sekunžu intervālu, kas veiks transformāciju visos pārraidījumos, kas saņemti ik pēc divām sekundēm. Ņemiet vērā, ka mēs iestatījām žurnāla līmeni uz ERROR
lai varētu atspējot lielāko daļu jūsu rakstīto žurnālu Dzirksts .
Mēs šeit definējam kontrolpunktu, lai varētu atļaut periodisku RDD pārbaudi; tas ir obligāti jāizmanto mūsu lietojumprogrammā, jo mēs izmantosim valstiskas ugunsdzēsības transformācijas (tiks apspriests vēlāk tajā pašā sadaļā).
Tad mēs definējam mūsu galveno DStream dataStream, kas savienos serveri kontaktligzda ko mēs iepriekš izveidojām ostā 9009
un tas lasīs tweets no šīs ostas. Katrs DStream ieraksts būs čivināt.
from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # crea una configuración spark conf = SparkConf() conf.setAppName('TwitterStreamApp') # crea un contexto spark con la configuración anterior sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') # crea el Contexto Streaming desde el contexto spark visto arriba con intervalo de 2 segundos ssc = StreamingContext(sc, 2) # establece un punto de control para permitir la recuperación de RDD ssc.checkpoint('checkpoint_TwitterApp') # lee data del puerto 9009 dataStream = ssc.socketTextStream('localhost',9009)
Tagad mēs definēsim savu transformācijas loģiku. Pirmkārt, mēs sadalīsim visus tvītus vārdos un ievietosim tos RDD vārdos. Tad mēs filtrējam tikai visu vārdu mirkļbirkas un uzzīmējam tos blakus (hashtag, 1)
un mēs tos ievietojām RDD hashtagos.
Tad mums jāaprēķina, cik reizes hashtag ir pieminēts. Mēs to varam izdarīt, izmantojot funkciju reduceByKey
Šī funkcija aprēķinās, cik reizes hashtag ir pieminējusi katra grupa, tas ir, tas atiestatīs kontu katrā grupā.
kas ir Windows kodēts
Mūsu gadījumā mums ir jāaprēķina skaits visās grupās, tāpēc mēs izmantosim citu funkciju ar nosaukumu updateStateByKey
tā kā šī funkcija ļauj saglabāt RDD statusu, vienlaikus atjauninot to ar jauniem datiem. Šo veidlapu sauc par Stateful Transformation
.
Ņemiet vērā, ka, lai izmantotu updateStateByKey
, jums jākonfigurē kontrolpunkts un iepriekšējā darbībā paveiktais.
# divide cada Tweet en palabras words = dataStream.flatMap(lambda line: line.split(' ')) # filtra las palabras para obtener solo hashtags, luego mapea cada hashtag para que sea un par de (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # agrega la cuenta de cada hashtag a su última cuenta tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # procesa cada RDD generado en cada intervalo tags_totals.foreachRDD(process_rdd) # comienza la computación de streaming ssc.start() # espera que la transmisión termine ssc.awaitTermination()
updateStateByKey
funkciju uzskata par parametru, ko sauc par update
funkciju. Tas tiek izpildīts katram RDD vienumam un veic vēlamo loģiku.
Mūsu gadījumā mēs esam izveidojuši atjaunināšanas funkciju ar nosaukumu aggregate_tags_count
kas summēs visas new_values
(jaunās vērtības) katram hashtagam un pievienos tos total_sum
(kopējā summa), kas ir visu grupu summa un saglabā datus RDD tags_totals
.
def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)
Tad mēs veicam RDD apstrādi tags_totals
katrā grupā, lai to varētu pārveidot par pagaidu tabulu, izmantojot Spark SQL konteksts un pēc tam izveidojiet paziņojumu, lai varētu paņemt desmit labākos hashtagus ar saviem kontiem un ievietot tos datu rāmī hashtag_counts_df
.
def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print('----------- %s -----------' % str(time)) try: # obtén el contexto spark sql singleton desde el contexto actual sql_context = get_sql_context_instance(rdd.context) # convierte el RDD a Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # crea un DF desde el Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Registra el marco de data como tabla hashtags_df.registerTempTable('hashtags') # obtén los 10 mejores hashtags de la tabla utilizando SQL e imprímelos hashtag_counts_df = sql_context.sql('select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10') hashtag_counts_df.show() # llama a este método para preparar los 10 mejores hashtags DF y envíalos send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print('Error: %s' % e)
Pēdējais solis mūsu lietotnē Spark ir datu rāmja nosūtīšana hashtag_counts_df
uz informācijas paneļa lietotni. Tādējādi mēs pārveidosim datu ietvaru divās matricās, viena hashtagiem un otra viņu kontiem. Tad mēs virzīsimies uz informācijas paneļa lietotni, izmantojot REST API.
def send_df_to_dashboard(df): # extrae los hashtags del marco de data y conviértelos en una matriz top_tags = [str(t.hashtag) for t in df.select('hashtag').collect()] # extrae las cuentas del marco de data y conviértelos en una matriz tags_count = [p.hashtag_count for p in df.select('hashtag_count').collect()] # inicia y envía la data a través de la API REST url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)
Visbeidzot, šeit ir Dzirksteļu straumēšana skrienot un drukājot hashtag_counts_df
. Jūs ievērosiet, ka izvade tiek izdrukāta tieši ik pēc divām sekundēm katram grupas intervālam.
Tagad mēs izveidosim vienkāršu informācijas paneļa lietojumprogrammu, kuru Spark atjauninās reāllaikā. Mēs to veidosim, izmantojot Python, Flask un Charts.js .
Pirmkārt, mēs izveidosim Python projektu ar zemāk redzamo struktūru, lejupielādējiet un pievienojiet failu Chart.js statiskajā direktorijā.
Tad failā app.py
mēs izveidosim funkciju ar nosaukumu update_data
, kuru Spark izsauks caur URL http://localhost:5001/updateData
lai varētu atjaunināt globālās etiķetes un vērtību masīvus.
Līdzīgi funkcija refresh_graph_data
tas ir izveidots, lai to izsauktu AJAX pieprasījums atgriezt jaunās atjauninātās etiķetes un vērtību masīvus kā JSON. Funkcija get_chart_page
atstās lapu chart.html
kad sauc.
from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route('/') def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print('labels now: ' + str(labels)) print('data now: ' + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return 'error',400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print('labels received: ' + str(labels)) print('data received: ' + str(values)) return 'success',201 if __name__ == '__main__': app.run(host='localhost', port=5001)
Tagad mēs izveidosim vienkāršu grafiku failā chart.html
lai varētu parādīt hashtag datus un tos atjaunināt reāllaikā. Kā noteikts tālāk, mums jāimportē JavaScript bibliotēkas Chart.js
un jquery.min.js
.
Taga pamattekstā mums jāizveido audekls un jāpiešķir tam ID, lai varētu uz to atsaukties, parādot diagrammu, kad nākamajā darbībā izmantojat JavaScript.
Top Trending Twitter Hashtags Top Trending Twitter Hashtags
Tagad mēs izveidosim diagrammu, izmantojot zemāk esošo JavaScript kodu. Pirmkārt, mēs ņemam audekla elementu un pēc tam izveidojam jaunu grafa objektu un nododam tam audekla elementu un definējam datu objektu, kā redzams zemāk.
Ņemiet vērā, ka datu etiķetes ir savienotas ar iezīmēm un vērtību mainīgajiem, kas tiek atgriezti, atstājot lapu, izsaucot get_chart_page
failā app.py
.
Pēdējā daļa ir funkcija, kas ir konfigurēta, lai katru sekundi veiktu Ajax pieprasījumu un izsauktu URL /refreshData
, kas izpildīs refresh_graph_data
iekš app.py
un tas atgriezīs jaunos atjauninātos datus un pēc tam atjauninās diagrammu, kuru atstāj jaunie dati.
ir skaņas mākonis labāk nekā spotify
var ctx = document.getElementById('chart'); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} '{{item}}', {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000);
Trīs lietojumprogrammas palaidīsim šādā secībā: 1. Twitter App Client. 2. Spark App. 3. Informācijas paneļa tīmekļa lietotne.
Tad reāllaikā varat piekļūt vadības panelim, meklējot URL
kā padarīt nesaskaņas botu 2018
Tagad jūs varat redzēt, kā jūsu diagramma tiek atjaunināta zemāk:
Mēs esam iemācījušies veikt vienkāršu datu analīzi reāllaika datos, izmantojot Spark Streaming, un tieši tos integrējot ar vienkāršu vadības paneli, izmantojot tīmekļa pakalpojumu RESTful. No šī piemēra mēs varam redzēt, cik Spark ir spēcīgs, jo tas uztver masveida datu plūsmu, pārveido to un iegūst vērtīgu informāciju, ko var viegli izmantot lēmumu pieņemšanai īsā laikā. Ir daudz noderīgu lietojumu, kurus var ieviest un kuri var kalpot dažādām nozarēm, piemēram, ziņas vai mārketings.
Ziņu nozares piemērs
Mēs varam izsekot visbiežāk minētās atsauces, lai uzzinātu, par kādām tēmām cilvēki runā sociālajos tīklos. Mēs varam arī izsekot noteiktiem mirkļbirkām un to tvītiem, lai uzzinātu, ko cilvēki saka par konkrētām tēmām vai notikumiem pasaulē.
Mārketinga piemērs
Mēs varam apkopot tvītu pārsūtīšanu un, veicot viedokļu analīzi, tos kategorizēt un noteikt cilvēku intereses, lai sniegtu viņiem ar viņu interesēm saistītus piedāvājumus.
Turklāt ir daudz izmantošanas gadījumu, kurus var izmantot tieši analīzei. lielie dati un tie var kalpot daudzām nozarēm. Lai iegūtu vairāk Apache Spark lietošanas gadījumu kopumā, iesaku pārbaudīt kādu no mūsu iepriekšējās ziņas .
Iesaku izlasīt vairāk par Dzirksteļu straumēšana šeit lai uzzinātu vairāk par tā iespējām un veiktu uzlabotas datu transformācijas, lai reāllaikā iegūtu vairāk informācijas, to izmantojot.