Hola, me llamo Miguel y hoy les traigo otro artículo.
Índice
Escribir aplicaciones Java
para recopilar datos de Twitter
y visualizarlos en un gráfico
En este artículo, crearemos un proyecto de ciencia de datos. Recopilamos datos de Twitter porque tiene una enorme cantidad de datos y nos permite obtenerlos.
Preferimos Java porque es un lenguaje compilado y tiene una fuerte biblioteca de simultaneidad. Finalmente, resumimos esos datos utilizando Gephi, que es una plataforma de gráficos de código abierto.
Gephi
a partir de tweets
de muestra el 14/9/20
. El idioma era TH
.Necesitamos lo siguiente para hacer el proyecto:
- IDE de Java. Nuestra elección es
Eclipse
. - Bibliotecas
Twitter4j
. Obtenga los archivos jar y el tutorial de aquí. - Cuenta de desarrollador de Twitter. Necesitamos esto para poder llamar a la API de Twitter. Hay algunos recursos que mencionan cómo obtener acceso.
- Cualquier base de datos compatible con
JDBC
. UsamosSqlite
. Es muy liviano. No se requiere instalación de software. Sin proceso de demonio. Simplemente copie el archivojar
deSqlite
en el proyecto. Sin embargo, existen algunas limitaciones que requieren soluciones. -
Gephi
, que es una herramienta gráfica de código abierto. Descárgalo desde aquí.
Por cierto, los lectores pueden usar cualquier lenguaje o plataforma que deseen, Python
o Node.js
. Nuestro código de muestra está en Java
.
Los siguientes son pasos para construir el gráfico de red de Twitter
:
- Recopile tweets y usuarios y guárdelos en una base de datos.
- Recupera los amigos de los usuarios. De la lista de usuarios del paso anterior, hazte amigos de ellos. Los guardaremos en tablas.
- Filtrar por los datos que nos gustaría ver en el gráfico.
- Exportar los datos a archivos CSV.
- Importe archivos CSV a Gephi. Haz un poco de formateo, diseño. Obtendremos un gráfico social de twitter.
Recopilar tweets
y usuarios
Para el primer paso, recopilamos tweets de muestra y luego los escribimos en tablas. Para hacer esto:
- Crea un objeto de flujo de
Twitter
. Pruebe la corriente. LaAPI
proporciona un subconjunto aleatorio de todos los tweets. - Por cada tweet recibido, envíe una tarea invocable a un servicio ejecutor. La tarea realizará operaciones de base de datos y / o procesamiento adicional.
A continuación se muestra el código:
package twitter; import java.util.concurrent.ExecutorService; import twitter4j.TwitterStreamFactory; class SampleStreamProcessor { private final ExecutorService execSvc; private final String language; private twitter4j.TwitterStream twitterStream; private long tweetCount = 0; public SampleStreamProcessor( ExecutorService execSvc, String language) { super(); this.execSvc = execSvc; this.language = language; } public void run() { twitterStream = new TwitterStreamFactory().getInstance(); twitterStream.onStatus(tweet->{ execSvc.submit(new SaveSampleTweetTask(tweet)); tweetCount++; }); twitterStream.onException(e->e.printStackTrace()); twitterStream.sample(language); } public void close() { twitterStream.cleanUp(); twitterStream.shutdown(); } }
El código de la tarea invocable guardará tweets
y objetos relacionados, como usuarios, en tablas. Al utilizar el servicio ejecutor, desacoplamos el procesamiento de tweets
y la tarea relacionada con la base de datos.
Incluso si a veces los tweets llegan más rápido de lo que la base de datos podría procesar, nuestra aplicación no se perderá nada.
Además, dado que usamos la base de datos Sqlite
y solo podría haber una escritura en la base de datos en un solo momento, el servicio ejecutor debe ser Single Thread Executor
. Lo siguiente es parte del código de la tarea:
package twitter; import java.sql.SQLException; import java.util.concurrent.Callable; import twitter4j.Status; class SaveSampleTweetTask extends SaveTweetTask implements Callable<Integer>{ private final Status status; public SaveSampleTweetTask(Status status) { super(status); this.status = status; } @Override public Integer call() { //Save status and status.getUser() via JDBC //Return number of rows inserted } }
Recuperar amigos de los usuarios
Del paso anterior, obtenemos una lista de usuarios que nos gustaría conocer a todos sus amigos. La API de Twitter devuelve las ID de amigo de un usuario específico, pero no más de 5000
ID en una sola solicitud.
Necesitamos llamar varias veces si ese usuario tiene más que eso. Además, Twitter
tiene límites de tasa. Solo permite 15
solicitudes
por ventana de 15 minutos
. Básicamente, 1
solicitud por minuto.
Para cada solicitud, podríamos consultar hasta 100 ID de usuario. El límite de velocidad para esto es de 300 solicitudes
por ventana de 15 minutos
. Entonces, son 20 solicitudes
por minuto.
Más detalles de la API de Twitter y los límites de velocidad aquí.
Para manejar los límites de tasa de manera efectiva, tendremos 2 subprocesos
. el primer hilo invocará la consulta de ID de amigo. El segundo hilo hará que los usuarios busquen parte.
El hilo del buscador de amigos pasará los identificadores de usuario al hilo de búsqueda del usuario a través de una cola de bloqueo. Básicamente, aquí usamos el patrón productor-consumidor
.
El siguiente código es parte de FriendsLookupRunnable
.
package twitter.friends; import java.sql.SQLException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.stream.Collectors; import twitter4j.IDs; import twitter4j.RateLimitStatus; import twitter4j.Twitter; import twitter4j.TwitterException; import twitter4j.User; import twitter4j.api.FriendsFollowersResources; public class FriendsLookupRunnable implements Runnable { private static final int TWITTER_RATE_LIMIT_EXCEEDED = 429; public static final long POISON_PILL = -1L; private static final int CALL_DELAY = 60*1000; //Wait 60 seconds per call private final DateFormat dateformat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.sss"); private final BlockingQueue<Long> friendsLookupQueue; private final BlockingQueue<Long> usersLookupQueue; private long lastCalled = 0; private final FriendsFollowersResources friendsRes; private final boolean isRecursiveLookup = false; public FriendsLookupRunnable(Twitter twitter, BlockingQueue<Long> friendsLookupQueue, BlockingQueue<Long> usersLookupQueue) { super(); this.friendsLookupQueue = friendsLookupQueue; this.usersLookupQueue = usersLookupQueue; this.friendsRes = twitter.friendsFollowers(); } @Override public void run() { int count=0; Long userId; while( (userId = this.friendsLookupQueue.poll()) != null) { try { System.out.printf("%s : Processing %s...", dateformat.format(new Date()), user!=null ? user : userId.toString()); List<Long> friendIds = this.getFriendIds(userId); System.out.println("Done"); count += FriendDAO.insertUserFriends(userId, friendIds); if (this.isRecursiveLookup) this.friendsLookupQueue.addAll(friendIds); friendIds.removeAll(FriendDAO.getUserIds()); if (user == null) this.usersLookupQueue.add(userId); this.usersLookupQueue.addAll(friendIds); } catch (InterruptedException e) { e.printStackTrace(); break; } catch (SQLException e) { e.printStackTrace(); break; } catch (TwitterException e) { e.printStackTrace(); System.err.printf("\nSkipped this user: %d\n", userId); continue; } } //The last one this.usersLookupQueue.add(POISON_PILL); System.out.printf("Inserted %,d user-friends\n", count); } private List<Long> getFriendIds(long userId) throws InterruptedException, TwitterException { long cursor = -1; boolean hasNext; List<Long> friendIds = new ArrayList<>(); do { long timeSinceLastCalled = System.currentTimeMillis() - this.lastCalled; if (timeSinceLastCalled < CALL_DELAY ) { Thread.sleep(CALL_DELAY - timeSinceLastCalled); } IDs idsObj; try { this.lastCalled = System.currentTimeMillis(); idsObj = friendsRes.getFriendsIDs(userId, cursor); } catch (TwitterException e) { if (e.getStatusCode()==TWITTER_RATE_LIMIT_EXCEEDED) { //Retry hasNext=true; continue; } //Other twitter exception throw e; } //Convert long[] to List<Long> and add to the list friendIds.addAll(Arrays.stream(idsObj.getIDs()).boxed().collect(Collectors.toList())); cursor = idsObj.getNextCursor(); hasNext = idsObj.hasNext(); } while( hasNext ); return friendIds; } }
Algunos puntos clave:
- El método de ejecución de este ejecutable sondearía una identificación de usuario de una cola de bloqueo de identificaciones de usuario para procesar.
- Para cada ID, llame al método
getFriendIds
. Este método devuelve una lista de ID de amigos. Cada par de identificación de usuario y de amigo se inserta en la tablaUser_friend
. - Los ID de amigos resultantes también se colocan en otra cola de bloqueo. El otro subproceso recuperaría estos ID para procesarlos.
- El método
getFriendIds
realiza un seguimiento de cuándo fue la última vez que se llamó y se asegura de que haya suficiente retraso entre cada llamada (1 minuto) utilizandoThread.sleep ()
. - Aunque hacemos eso, hay casos muy raros en los que el límite de tasa excedió la excepción. Entonces, detectamos
TwitterException
y comparamos el código de estado de la excepción. Si el límite de frecuencia superó, simplemente volvemos a intentar la consulta. - Hay algunas otras excepciones. Por ejemplo, cuando un usuario está protegido, la API de Twitter le dará un error no autorizado.
El siguiente es el comando para crear la tabla User_Friend
que almacena el resultado del primer hilo:
CREATE TABLE User_Friend ( user_id INT (8), friend_id INT (8), PRIMARY KEY (user_id,friend_id) );
El siguiente código es la clase UsersLookupRunnable
.
package twitter.friends; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; import twitter4j.ResponseList; import twitter4j.Twitter; import twitter4j.TwitterException; import twitter4j.User; import twitter4j.api.UsersResources; import utils.ArrayUtils; public class UsersLookupRunnable implements Runnable { private static final int MAX_LOOKUP_USERS_LEN = 100; private static final int CALL_DELAY = 3*1000; //Wait 3 seconds per call private final BlockingQueue<Long> usersLookupQueue; private long lastCalled = 0; private final UsersResources usersRes; public UsersLookupRunnable(Twitter twitter, BlockingQueue<Long> usersLookupQueue) { super(); this.usersLookupQueue = usersLookupQueue; this.usersRes = twitter.users(); } @Override public void run() { int count=0; boolean isDone = false; while(!isDone) { try { List<Long> userIds = new ArrayList<>(); userIds.add(this.usersLookupQueue.take()); if (this.usersLookupQueue.size()>0) { this.usersLookupQueue.drainTo(userIds); } if (userIds.contains(FriendsLookupRunnable.POISON_PILL)) { userIds.remove(FriendsLookupRunnable.POISON_PILL); isDone = true; } Long[] userIdsArray = userIds.toArray(new Long[userIds.size()]); List<User> users = this.lookupUsers(ArrayUtils.toPrimitiveArray(userIdsArray)); count += FriendDAO.insertUsers(users); } catch (InterruptedException e) { e.printStackTrace(); break; } catch (SQLException e) { e.printStackTrace(); } } System.out.printf("Inserted %,d users\n", count); } private List<User> lookupUsers(long[] userIds) throws InterruptedException { List<User> users = new ArrayList<>(); int start = 0; while ( start < userIds.length ) { int end = (start + MAX_LOOKUP_USERS_LEN) <= userIds.length ? start + MAX_LOOKUP_USERS_LEN : userIds.length ; long[] subIds = Arrays.copyOfRange(userIds, start, end); long timeSinceLastCalled = System.currentTimeMillis() - this.lastCalled; if (timeSinceLastCalled < CALL_DELAY ) { Thread.sleep(CALL_DELAY - timeSinceLastCalled); } ResponseList<User> friends; try { this.lastCalled = System.currentTimeMillis(); friends = usersRes.lookupUsers(subIds); } catch (TwitterException e) { e.printStackTrace(); start = end; continue; } users.addAll(friends); start = end; } return users; } }
Aquí hay algunos puntos clave:
- En el método de ejecución, hay un ciclo
while
para recuperar los identificadores de usuario de la cola. Luego llamará al métodolookupUsers
para realizar la búsqueda real - Dado que
Twitter lookupUsers API
no puede manejar más de100
ID de usuario a la vez, cortaremos una matriz de ID de usuario en matrices de 100 elementos o menos antes de llamar a la API de Twitter. - El método
lookupUsers
realiza un seguimiento de cuándo fue la última vez que se llamó y se asegura de que haya suficiente retraso entre cada llamada (3 segundos) utilizandoThread.sleep ()
. - El método devuitelve la lista de usuarios que se insertaría en la tabla de usuarios. La estructura de la tabla debe ser similar a la interfaz de usuario de Twter.
El siguiente es el comando para crear una tabla de usuario que almacena el resultado del segundo hilo:
CREATE TABLE User ( id INT (8) PRIMARY KEY, name VARCHAR (100), screen_name VARCHAR (100), description VARCHAR (255), email VARCHAR (50), favorites_count INT, followers_count INT, friends_count INT, statuses_count INT, lang VARCHAR (10), location VARCHAR (255), url VARCHAR (255), imageurl VARCHAR (255), is_protected INT (1), is_verified INT (1), created VARCHAR (20), last_modified VARCHAR (20) );
El método principal haría lo siguiente:
- Configurar conexión a la base de datos.
- Crea
2 colas
de bloqueo. - Prepara la lista de ID de usuario. Lo agrega a la primera cola de bloqueo.
- Crea
2
subprocesos ejecutables y2
. - Inicia los
2
hilos. - Agrega gancho de apagado. Entonces, cuando el proceso se mata, interrumpirá ambos hilos.
- Espere hasta que terminen ambos
2
hilos. - Limpiar la base de datos.
El código debería verse así:
package twitter.friends; import java.io.IOException; import java.sql.SQLException; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import db.DBResources; import twitter4j.Twitter; import twitter4j.TwitterFactory; import utils.FileUtils; public class FriendFinder { private Twitter twitter; public FriendFinder() throws SQLException { this.twitter = new TwitterFactory().getInstance(); } public void find(List<Long> userIds) throws InterruptedException, SQLException { BlockingQueue<Long> friendsLookupQueue = new LinkedBlockingQueue<>(); BlockingQueue<Long> usersLookupQueue = new LinkedBlockingQueue<>(); friendsLookupQueue.addAll(userIds); Thread friendsLookupThread = new Thread(new FriendsLookupRunnable(twitter, friendsLookupQueue, usersLookupQueue)); Thread usersLookupThread = new Thread(new UsersLookupRunnable(twitter, usersLookupQueue)); friendsLookupThread.start(); usersLookupThread.start(); Runtime.getRuntime().addShutdownHook(new Thread(()-> { friendsLookupThread.interrupt(); usersLookupThread.interrupt(); DBResources.getInstance().close(); })); friendsLookupThread.join(); usersLookupThread.join(); DBResources.getInstance().close(); } public static void main(String[] args) throws SQLException, InterruptedException, IOException { if (args.length < 2) { throw new IllegalArgumentException(); } String jdbcURL = args[0]; String fileName = args[1]; String sql = FileUtils.readFile(fileName); System.out.printf("PID is %d\n", ProcessHandle.current().pid()); DBResources.newInstance(jdbcURL); DBResources.getInstance().getConnection().setAutoCommit(false); List<Long> userIds = FriendDAO.getUserIds(sql); FriendFinder friendFinder= new FriendFinder(); friendFinder.find(userIds); } }
Filtrar los datos (opcional)
A veces, nos gustaría ver solo una parte de todos los datos. Es bastante sencillo hacer esto porque los datos ya están en tablas SQL.
Digamos que nos gustaría ver cómo los 100
usuarios principales con más seguidores en nuestros tweets de muestra se siguen unos a otros. A continuación le indicamos lo que debe hacer:
- Crea tablas para almacenar los resultados. A continuación se muestran las declaraciones SQL utilizadas:
CREATE TABLE Graph_Friend_Edge ( Source INT, Target INT ); CREATE TABLE Graph_Friend_Node ( id INT PRIMARY KEY, label VARCHAR (50), name VARCHAR (100), );
- Llene la mesa de borde con solo los mejores usuarios. El siguiente es el SQL:
insert into graph_friend_edge(source, target) select user_id, friend_id from user_friend join user u1 on friend_id=u1.id join user u2 on user_id=u2.id where user_id in (select friend_id from user_friend group by friend_id order by count(*) desc limit 100) and friend_id in (select friend_id from user_friend group by friend_id order by count(*) desc limit 100);
- Luego, complete la tabla de nodos con este SQL:
insert into graph_friend_node(id, label, name) select n.id, u.screen_name, u.name from(select source id from graph_friend_edgeunionselect target id from graph_friend_edge) n join user u on n.id = u.id;
Exportar datos a archivos CSV
Esta parte es simple. Utilice la herramienta de base de datos para exportar datos a archivos CSV.
- Exporta la tabla
user_friend
al archivo CSV de borde. - Exportar tabla de usuario al archivo CSV de nodo.
Crea un gráfico de red
Gephi es un análisis y visualización de gráficos de código abierto. Hay muchos tutoriales de Gephi disponibles. Echar un vistazo aquí. Para el tutorial de importación de archivos CSV, encuéntrelo aquí.
Los siguientes son una descripción general de los pasos a seguir en nuestro proyecto:
- Abre Gephi. Crea un nuevo proyecto.
- Importe archivos CSV de borde y nodo. El gráfico inicial puede verse tan impresionado como este:
100
mejores amigos de los usuarios antes de aplicar el diseño de los tweets de muestra el 14/9/20
. El idioma era TH
.Necesitamos mostrar las etiquetas de los nodos. Configure el tamaño y los colores del nodo. Aplica algún diseño.
- Habilitar etiquetas de nodo.
- Configure el tamaño del nodo y el tamaño de la etiqueta proporcional al grado (número de bordes entrantes).
- Elija el diseño de
"ForceAtlas2"
y ejecútelo. - Ejecute el algoritmo de detección de la comunidad.
- Establezca el color del nodo según la clase de modularidad. Esto coloreará el nodo de acuerdo con la comunidad que detectó.
Una vez realizados estos, el gráfico parece más significativo:
- Los nombres de pantalla del usuario se muestran como etiquetas de nodo.
- Los nodos con más seguidores en este grupo aparecen más grandes.
- Los bordes, las líneas de flecha, representan las relaciones de seguimiento
- Los nodos con los mismos colores están en las mismas comunidades según el algoritmo gráfico.
100
mejores amigos de los usuarios generado mediante el uso de Gephi a partir de tweets de muestra el 14/9/20
. El idioma era TH
.Hemos hecho un filtrado de los datos. Luego, se importa a Gephi, la plataforma de gráficos y la herramienta de visualización y se produce un gráfico de red social.
Esta es una parte muy pequeña de lo que podríamos haber hecho con los datos de Twitter. Gephi tiene mucho más que ofrecernos. Además, existen más plataformas de análisis de gráficos. Neo4j
, por ejemplo, podría permitirnos almacenar datos en su base de datos y ejecutar algoritmos de gráficos.
Gracias por leer este artículo.
Añadir comentario