• 售前

  • 售后

热门帖子
入门百科

CS领域论文数据分析

[复制链接]
我的承诺只给你 显示全部楼层 发表于 2022-1-12 15:12:33 |阅读模式 打印 上一主题 下一主题
CS领域论文数据分析

  Streaming

  I. Streaming应用场景

  计算机领域也有多个子领域,每个子领域获得的投入和关注度也是不同的,随着时间的变化,研究热潮也在发生改变,对于我们而言,这几年感受最深的就是 AI 相关领域的崛起,直到现在还保持着极高的热度。那么其他的子领域的热度是怎么样的,某个领域内部不同作者的热门程度是什么样的?我们希望对其进行一番探究,从而能够对计算机领域的发展形势有一定的了解。
  首先我们进行了以下的这些假设:
  

  • 某个领域的热门程度和该领域在特定时间段内产出的论文数量相关
  • 某个作者的热门程度和该作者的论文被引用数量相关
  从而可以得到具体的业务问题:
  

  • 统计特定时间段内热度靠前的领域并比较各领域热度
  • 统计特定时间段内某领域的论文发表数量变化趋势
  • 统计特定时间段内某领域的热门论文和作者
  • 分析特定时间段内各领域的热度变化趋势
  • 分析特定时间段内每年的最热门领域
  II. Streaming数据获取

  2.1 数据来源

  Arxiv、 ACM 两个影响力较大的论文数据库网站,这些网站保存的数据基本上涵盖了CS领域有影响力的会议和期刊,可以较好地反应高水平CS研究的变化趋势,我们选择爬取它们从 2000 年到现在为止的 CS 领域的论文数据,了解近20年来CS领域的发展。
  各个网站的数据规模大致为:ACM大致15万条,Arxiv大约30万条。此外,由于ACM是正式的论文数据库,而arxiv则是论文预印本网站,各个网站之间可能存在数据重复,我们会进行去重处理。去重后,再经过一些预处理,比如处理包含空值的数据等,最终的数据规模大约为45万条。
  每一篇论文的信息包括:论文题目,作者,发表月份,发表年份,论文所属领域分类,引用数量。论文的多维度信息允许我们从领域、时间、作者等多个角度入手,基于Streaming和GraphX挖掘潜在的有用信息,帮助我们加深对于CS各个领域发展趋势的认识。论文信息的一个示例如下所示:
    2.2 数据获取

  首先,我们期望爬取的数据字段如下:
  

  • title:论文题目
  • authors:作者信息
  • subjects:领域分类信息
  • year:发表(提交)年份
  • month:发表(提交)月份
  • citations:被引用数(Arxiv 的数据库中并不包含被引用信息,故统一设置为零)
  在编写爬虫代码时,我们使用了 Scrapy 作为爬虫框架,并针对 ACM 和 Arxiv 两个数据库编写了两份爬虫代码。本文将以 ACM 爬虫为例,介绍数据获取过程,而 Arxiv 的爬虫流程与 ACM 爬虫的流程类似。
  按照 Scrapy 框架,首先需要定义 start_requests 方法,该方法定义了最开始的请求。我们最开始的访问目标是 ACM 数据库的搜索页面,通过设置时间参数,来获得特定时间段的论文数据。
  1. def start_requests(self):
  2.   for year, month in DateRange(self.start_year, self.after_month, self.end_year, self.before_month):
  3.     self.params['AfterYear'], self.params['BeforeYear'] = year, year
  4.     self.params['AfterMonth'], self.params['BeforeMonth'] = month, month
  5.     url = self.base_url + urlparse.urlencode(self.params)
  6.     self.log(f'Begin crawling url {url}')
  7.     yield Request(url, callback=self.parse, cb_kwargs={'month': month, 'year': year})
复制代码
可以看到上面代码中最后一行定义的参数 callback=self.parse,其中 parse 是一个函数,它定义了如何解析返回的信息。我们首先通过 css 选择器来获取包含目标信息的元素并对之进行解析,可以得到论文的 title, citations 等信息。由于搜索页面返回的论文信息还缺少论文的领域(subjects)相关信息,所以还需要访问论文的详情页面进一步爬取,代码中 yield reponse.follow(title_link, ...) 的作用就是发起访问详情页面的请求。代码末尾的 if 语句块中,定义了当当前搜索结果页面的所有论文信息都爬取完成后,代码将寻找是否存在下一页,如果有则继续爬取。
  1. def parse(self, response, month, year):
  2.   titles = response.css('.issue-item__title a')
  3.   urls = response.css('div.issue-item__detail a.issue-item__doi::text').getall()
  4.   citations = response.css('span.citation span::text').getall()
  5.   for title, url, citation in zip(titles, urls, citations):
  6.     title_content = title.css('::text').get()
  7.     title_link = title.css('::attr(href)').get()
  8.     item = CcspiderItem(title=title_content, month=month, year=year, url=url, citation=citation)
  9.     # visit detail pages of papers for other information
  10.     yield response.follow(title_link, callback=self._parse_paper_detail, cb_kwargs={'item': item})
  11.     # go to next page
  12.     next_page = response.css('a.pagination__btn--next::attr(href)').get()
  13.     if next_page is not None:
  14.       yield scrapy.Request(next_page, callback=self.parse, cb_kwargs={'month': month, 'year': year})
复制代码
解析论文详情页面的方式与解析搜索页面的方式基本一致,同样是通过 css 选择器来获取并解析目标信息。不同的是,ACM 数据库的领域分类信息分成多级,我们设定最多爬取到二级分类。此外,为了区分分类的级别,我们在中间插入了分隔符 ||| 。相关代码如下:
  1. # get indexed term
  2. item['subjects'] = response.css('.article__index-terms ol.level-1 > li > div a::text').getall()
  3. item['subjects'].append('|||')  # a separator to separate level-1 terms and level-2 terms
  4. item['subjects'].extend(response.css('.article__index-terms ol.level-2 > li > div a::text').getall())
复制代码
最后,通过命令行输入以下命令启动爬虫,并把结果输出到 acm.csv 文件中。
  scrapy crawl acm -a start_year=some_year -a end_year=some_year -a before_month=some_month -a after_month=some_month -o acm.csv
  2.3 数据预处理

  数据预处理主要分为两个部分:
  

  • 建立 ACM 数据库和 Arxiv 数据库关于领域分类的映射。
  • 去除重复数据(Arxiv 为论文预印本网站,其中的论文也有在 ACM 的数据库中存在)
  建立映射
  在 Arxiv 的官方文档中,我们得到了 Arxiv 对论文按照领域进行分类的标准。Arxiv 在描述分类标准时,说明了某领域大致对应了 ACM 于 1998 年定制的分类标准中的某个分类(即ACM Classification Codes )。ACM 在 2012 年定制了新的分类标准,但是和 98 年的标准仍有部分关联。最终,我们以上述三个文档为参考,把 ACM 的二级分了映射到了 Arxiv 的分类中。下面是部分映射的展示(左边是 ACM 的分类,右边是 Arxiv 的分类代码):
  1. {
  2.     'Artificial intelligence': 'AI',
  3.     'Software and application security': 'CR',
  4.     'Dependable and fault-tolerant systems and networks': 'DC',
  5.     'Document management and text processing': 'DL',
  6.     'Systems security': 'CR',
  7.     'Human computer interaction (HCI)': 'HC',
  8.     'Real-time systems': 'PF',
  9.     'Software notations and tools': 'SE',
  10.     'Very large scale integration design': 'AR',
  11.     'Models of computation': 'CC',
  12.     'Randomness,  geometry and discrete structures': 'CG',
  13.     'Network protocols': 'NI',
  14.     'Security in hardware': 'CR',
  15.     'Education': 'CY',
  16.     'Machine learning': 'LG',
  17.     'Robustness': 'AR',
  18.     'Network security': 'CR',
  19.     'Network algorithms': ['DS', 'NI']
  20.     ...
  21. }
复制代码
在分类过程中,ACM 大部分二级分类都能找到比较合适的对应项,少部分无法归入 Arxiv 分类标准的领域被统一放入 Other 这一分类中。还有少部分 ACM 二级分类可能涉及 Arxiv 分类标准中的多个分类,我们都进行了记录,如上述例子中的最后一行 'Network algorithms': ['DS', 'NI'] ,网络算法被同时映射到了 Data Structure and Algorithms 和 Networking and information structure 两个分类中。
  数据去重
  去重采取的标准是:按照论文的题目来判断是否重复,如果重复,则保留 ACM 的论文数据。关键代码如下:
  1. file_list = ['acm_1.json', 'acm_2.json', 'arxiv.json', ...]
  2. data_list = []
  3. for file in file_list:
  4.     data_list.append(pd.read_json(file))
  5. result = pd.concat(data_list, ignore_index=True)
  6. result = result.sort_values(by=['citation'], ascending=False)
  7. result = result.drop_duplicatee(['title'], keep='first')
复制代码
代码最后两行首先对所有数据按照引用数进行了排序,这样做的目的是尽可能让 ACM 的数据排在前面(因为只有 ACM 的数据的 citation 一项可能不为零),让后在进行去重时,设置保留遇到的第一个数据即可保留 ACM 提供的论文数据。
  2.4 数据存储

  爬取并经过预处理后的数据以 json 格式进行存储,并最终传递到部署在云端的 HDFS 上。
  2.5 困难和解决方案

  在最开始的设想中,我们计划爬取的是三个数据库(ACM,IEEE 和 Arxiv)的数据,但是在实际爬取时,除了 Arxiv 的爬取速度较快之外(虽然 Arxiv 在 robots.txt 中规定了 15 秒的爬取间隔,但是每次搜索最多可以返回两百条记录,且仅从搜索页面就可以获取希望爬取的所有紫外),另外两个网站由于需要额外爬取论文的详情页面去获取需要的信息,且每个请求需要一定的间隔以防止 IP 被封,导致爬取速度十分缓慢。
  经过衡量,我们决定放弃爬取 IEEE 的数据库,一方面是 IEEE 访问的限制比 ACM 更严,另一方面是 ACM 的数据库的领域分类标准和 Arxiv 有一定的关联。
  由于 ACM 网站设置了爬虫的最短间隔时间为 1 秒(低于 1 秒 IP 会被封),经实际测试,爬取 15,000 条左右的数据,需要消耗接近 8 个小时左右的时间。最终,截至作业提交为止,我们花了十几天的时间,使用自己的机器,断断续续的爬取了从 2007 年到 2020 年的 ACM 的数据,最终爬取到了 150,000 条左右的数据,加上 Arxiv 的数据后,总共获取了 450,00 条数据。
  III. 流计算总体架构

  3.1 流准备和监听

  在本项目中,我们采用文件作为流的输入。Spark Structured Streaming 本身对文件形式的数据源有很好的支持,只需要指定 HDFS 的目标文件夹即可自动对该文件夹进行监听。在流启动之后,只需要向监听的文件夹传入文件,流便会将这份文件加入到处理的队列。
  首先介绍准备流的输入的过程。JsonSource 对象存储了读取数据的 schema 信息,并提供创建流的输入的方法 createData。该方法简单的调用了 Spark 提供的创建流的输入的接口,同时传入了 schema 和文件夹路径信息。
  1. object JsonSource {
  2.   val schema: StructType = StructType(
  3.     Array(
  4.       StructField(
  5.         "authors",
  6.         ArrayType(StringType)
  7.       ),
  8.       StructField("citation", IntegerType),
  9.       StructField("month", IntegerType),
  10.       StructField("subjects", ArrayType(StringType)),
  11.       StructField("title", StringType),
  12.       StructField("year", IntegerType)
  13.     )
  14.   )
  15.   def createData(spark: SparkSession, dir: String): DataFrame = {
  16.     spark.readStream
  17.       .schema(schema)
  18.       .json(dir)
  19.   }
  20. }
复制代码
创建并启动流的过程被放到了 Main.scala 文件中,如下方代码所示。我们通过调用 JsonSource 的 createData 方法,并传入 SparkSession 和指定监听文件夹路径,就创建了流的输入。随后,流的输入(paperData)作为参数传入一个工厂方法用于创建流对象(这里用到的流对象是我们自定义的类,而不是 Spark 提供的类,它的详细情况将在下一小节进行说明)。StreamFactory 采用了工厂模式,它的不同方法将创建不同业务目标的流对象。最后,在代码结尾部分,通过调用流对象的 start() 方法启动流。
  1. object Main {
  2.   ...
  3.   def main(args: Array[String]): Unit = {
  4.     val spark = SparkSession
  5.       .builder()
  6.       .appName("CCStreaming")
  7.       .config("spark.cores.max", 4)
  8.       .master(SPARK_MASTER_ADDRESS)
  9.       .getOrCreate()
  10.     val paperData = source.JsonSource.createData(spark, HDFS_DIRECTORY)
  11.     val authorCitationStream =
  12.       StreamFactory.createAuthorCitationStream(
  13.         paperData,
  14.         s"$CHECK_POINT_DIR/authorCitations"
  15.       )
  16.    
  17.     ...
  18.     authorCitationStream.start()
  19.     paperCitationStream.start()
  20.     paperCountStream.start()
  21.     spark.streams.awaitAnyTermination()
  22.   }
  23. }
复制代码
3.2 流计算

  首先,我们定义了抽象类 Stream 。它定义了两个需要子类实现的接口,分别是 transformSource 和 setSink 。其中 transformSource 需要子类定义对流的操作,子类在进行实现时会根据各自的业务需求进行不同的操作;而 setSink 方法则需要子类定义怎样对流的输出进行保存,在我们的实现中,流的输出均被保存到了 MySQL 数据库中。
  1. abstract class Stream(
  2.     source: DataFrame,
  3.     checkpointDir: String
  4. ) {
  5.   private var streamingQuery: StreamingQuery = _
  6.   def start(): Unit = {
  7.     val transformation = transformSource(source)
  8.     val driver = setSink(transformation)
  9.     streamingQuery = driver.start()
  10.   }
  11.   def stop(): Unit = {
  12.     if (streamingQuery != null)
  13.       streamingQuery.stop()
  14.   }
  15.   protected def transformSource(data: DataFrame): DataFrame
  16.   protected def setSink(
  17.       data: DataFrame,
  18.       checkpointDir: String = checkpointDir
  19.   ): MysqlSinkDriver
  20. }
复制代码
下面介绍 Stream 的子类之一:PaperCountStream 。该子类负责统计每一年每一月中每个领域的论文数量和作者数量。相关代码如下:
  1. class PaperCountStream(source: DataFrame, checkpointDir: String)
  2.     extends Stream(
  3.       source,
  4.       checkpointDir
  5.     ) {
  6.   override protected def transformSource(data: DataFrame): DataFrame = {
  7.     data
  8.       .withColumn("subject", explode(col("subjects")))
  9.       .select(
  10.         col("year"),
  11.         col("month"),
  12.         col("subject"),
  13.         size(col("authors")).alias("author_count")
  14.       )
  15.       .groupBy("year", "month", "subject")
  16.       .agg(count(lit(1)).alias("paper_count"), sum("author_count"))
  17.   }
  18.   override protected def setSink(
  19.       data: DataFrame,
  20.       checkpointDir: String
  21.   ): MysqlSinkDriver = {
  22.     import PaperCountStream._
  23.     new MysqlSinkDriver(
  24.       data,
  25.       checkpointDir,
  26.       preparedSql,
  27.       settingSql
  28.     )
  29.   }
  30. }
  31. object PaperCountStream {
  32.   private val preparedSql = "insert into subject_paper_count " +
  33.     "(year, month, subject, paper_count, author_count) " +
  34.     "values (?, ?, ?, ?, ?) " +
  35.     "ON DUPLICATE KEY UPDATE paper_count = ?, author_count = ?;"
  36.   //noinspection DuplicatedCode
  37.   private def settingSql(statement: PreparedStatement, row: Row): Unit = {
  38.     statement.setInt(1, row.getInt(0))
  39.     statement.setInt(2, row.getInt(1))
  40.     statement.setString(3, row.getString(2))
  41.     statement.setInt(4, row.getLong(3).toInt)
  42.     statement.setInt(5, row.getLong(4).toInt)
  43.     statement.setInt(6, row.getLong(3).toInt)
  44.     statement.setInt(7, row.getLong(4).toInt)
  45.   }
  46. }
复制代码
可以看到,该子类对 Stream 定义的接口均进行了具体的实现。该子类的 transformSource 的大致逻辑是:首先将 subjects 展开成多条数据(subjects 列是一个数组,里面包含了一至多个领域分类信息),让一条数据只包含一个领域分类。然后进行分组和聚合操作,统计某年某月某领域的作者数和文章数。
  如下是另外两个流对数据进行的转换操作相关代码。下图中的第一段代码的操作大致是将论文按照领域展开成多条数据后,选择引用数大于 10 的论文写入数据库。而第二段代码的操作大致是将一条数据按照作者和领域展开成多条数据,使每一条数据只有一个作者和领域,然后按照作者、年份和领域进行分组,得到某年某领域某个作者所获得的总引用数,最终选择总引用数高于 10 的作者写入数据库。第一段代码对应的业务需求是获取某领域中引用数靠前的论文;而第二段代码对应的业务需求是统计某年某领域引用数排名靠前的作者。
  1. // 第一段代码
  2. data
  3. .filter(col("citation") > 10)
  4. .withColumn("subject", explode(col("subjects")))
  5. .select(
  6.     col("year"),
  7.     col("month"),
  8.     col("subject"),
  9.     col("title"),
  10.     col("citation")
  11. )
  12. // 第二段代码
  13. data
  14. .filter(col("citation") > 0)
  15. .withColumn("author", explode(col("authors")))
  16. .withColumn("subject", explode(col("subjects")))
  17. .select(
  18.     col("year"),
  19.     col("author"),
  20.     col("subject"),
  21.     col("citation")
  22. )
  23. .groupBy("year", "author", "subject")
  24. .agg(sum("citation").alias("totalCitations"))
  25. .filter(col("totalCitations") > 10)
复制代码
而 setSink 用到了另一个类 MysqlSinkDriver ,它的职责是对创建输出流。在 setSink 方法中传入了 SQL 语句以及对该语句进行传值的函数作为 MysqlSinkDriver 的参数,MysqlSinkDriver 在接受了所有参数后,就会去处理 JDBC 的相关连接,并根据接受到的 sql 和设置函数(settingSql)执行具体的写数据库操作。如下是 MysqlSinkDriver 的关键代码:
  1. def start(): StreamingQuery = {
  2.   val writer = new JDBCSink(preparedSql, settingSql)
  3.   dataFrame.writeStream
  4.   .foreach(writer)
  5.   .outputMode("update")
  6.   .option("checkpointLocation", checkPointDir)
  7.   .start()
  8. }
复制代码
可以看到,在 start 方法中,定义了输出流怎样去写每一条数据(foreach(writer))、输出模式(ouputMode)、检查点存储位置(checkpointLocation)等信息。其中,JDBCSink 是实现 Spark 提供的 ForeachWriter 抽象类的一个类,它负责具体的连接数据库,执行 SQL 语句操作,它的核心代码如下:
  1. override def open(partitionId: Long, epochId: Long): Boolean = {
  2.     import JDBCSink._
  3.     Class.forName(JDBC_DRIVER_CLASS) // Load the driver
  4.     connection = DriverManager.getConnection(JDBC_URL, USER, PASSWORD)
  5.     statement = connection.prepareStatement(preparedSql)
  6.     true
  7. }
  8. override def process(value: Row): Unit = {
  9.     setValue(statement, value)
  10.     statement.executeUpdate()
  11. }
复制代码
上面这段代码的两个方法分别负责和数据库建立连接以及执行 SQL 语句。
  在启动流之前,还需要创建数据库表,针对 PaperCountStream 我们创建的数据库表的 sql 语句如下。完成创建后,即可启动流,传入数据,等待流的处理,一段时间之后便可以在数据库看到流处理的结果了。
  1. create table subject_paper_count
  2. (
  3.         month int not null,
  4.         year int not null,
  5.         subject varchar(128) not null,
  6.         paper_count int not null,
  7.     author_count int not null,
  8.     constraint subject_paper_count_pk
  9.                 primary key (month, year, subject)
  10. );
复制代码
最后,除了上面介绍到的 PaperCountStream 外,我们还实现了另外两个流 AuthorCitationStream 和 PaperCitationStream ,分别用于处理另外两个业务需求。它们的实现方式和 PaperCountStream 类似,不再赘述。
  3.3 计算结果动态展示

  见 PPT 中的视频展示部分。
  3.4 困难和解决方案

  主要困难在于数据量导致的问题。在进行测试时,我们使用了 ACM 2011 年的论文数据,数据量在一万三千条左右。在实现“统计某年某领域的引用数排名靠前的作者有哪些”这一需求时,中途需要将一条数据展开成多条数据,让每一条数据只包含一个领域和一个作者,如果一条数据存在 n 个作者和 m 个领域分类,那么一条数据最终将展开成 m x n 条数据。根据实际情况,13,000 条论文数据根据领域展开大致能获得 60,000 条数据,假设一篇论文的作者数量为 3,那么最终需要处理的数据量大致在 180,000 条数据左右。在测试过程中,在我们的环境下(三台云主机作为 Slaves,总共 6 个核),三个流全部计算完并写入数据库需要 9 分钟左右,且数据库写入了七万多条数据,其中写数据库占用了大量的时间。
  为了减小数据量,我们起初尝试让 Spark 的流只记录并写入排名靠前的记录。但是这种操作 Spark 流计算不支持,我们猜测原因是需要维持的状态和进行的对比过多,过于复杂,所以并不支持。最后,我们采取对数据按照一定条件过滤的方式来减小数据量。首先,我们对被引用数为 0 的论文数据进行了过滤,它们对统计作者被引用数不会产生影响。在计算结果完成但还没有进行数据库写操作时,我们设置需要该作者的被引用数大于一定数值(本项目中为 10)才被写入,目的是为了过滤引用数过小,极大概率不会出现在靠前排名的数据。最终, 三个流处理 13,000 条数据的时间大致在 4 分钟左右,并且数据库需要存储的数据量也大幅减少了。
  IV. 业务需求实现

  4.1 领域热度统计

  针对业务问题「统计特定时间段内热度靠前的领域并比较各领域热度」,我们的实现是:用户选择要统计的时间段(默认为5年)之后,调用Streaming计算各领域在该时间段内的作者数和产出的论文数,并以论文数作为该领域热度的标准进行排名,返回热度最高的前20个领域,并用表格和饼图来展示,相关系统的运行截图如下所示。
    表格按照热度的降序展示各个领域的基本信息,并提供了进一步展示各个领域详细信息的用户接口(见[4.2 论文数量变化](# 4.2 论文数量变化)和[4.3 热门论文和作者](# 4.3 热门论文和作者));饼图中则按照论文数量设置各区域的大小,直观比较各领域的热门程度。通过实现此业务需求,我们帮助用户把握近年来各领域的发展趋势,抓住当前的热门领域。
  例如,正如我们所猜测的,近年来算力的大幅提升和深度学习技术的广泛应用,使得AI领域以及CV、NLP等应用深度学习技术的子领域在热门程度上独占鳌头。这反映了这些领域存在大量可供探索的新问题和亟待优化的旧方法,存在较为广阔的研究前景,是值得关注的热门领域。
  4.2 论文数量变化

  针对业务问题「统计特定时间段内某领域的论文发表数量变化趋势」,根据用户输入的统计年份数以及在表格中选择的领域信息,我们调用Streaming计算该领域在该时间段内每年的论文产出,相关系统的运行截图如下所示。
    这么做的理由在于,论文发表数是领域热度的晴雨表:当论文数量逐步上升,说明当前领域方兴未艾,可探索空间较广;当论文数量逐年下降,说明当前领域可能正走向寒冬,在之后一段时间可能处于艰难期;当论文数量居高不下,意味着虽然领域本身势头强劲,但是也存在着竞争激烈、鱼龙混杂等问题。
  另一个值得关注论文发表数的理由在于:某一领域内部也存在“大小年”的区别,例如在具有重要启发意义的GAN提出之后,必然在之后的几年内引起GAN应用论文发表的热潮,通过关心这样的数量变化趋势,能够帮助我们把握该领域一些重要论文的影响,对领域热度的变化有更加清晰直观的了解。
  4.3 热门论文和作者

  针对业务问题「统计特定时间段内某领域的热门论文和作者」,根据用户输入的统计年份数以及在表格中选择的领域信息,我们调用Streaming计算该领域在该时间段内各篇论文和各个作者的被引用数,并返回被引用数最高的前10篇论文和前10位作者,使用排行榜进行展示。相关系统的运行截图如下所示。
    通过搜索某一领域内热门的论文和作者,我们可以了解到该领域的前沿发展,着眼于该领域内有影响力的工作:那些高引用的论文往往是这一领域的经典论文,是很多后续研究的基石;而高引用的作者意味着他的工作被同行广泛认可,是值得关注和借鉴的。
  4.4 领域热度变化/年度热门领域

  针对业务问题「分析特定时间段内各领域的热度变化趋势」,我们的实现是:用户选择要统计的时间段(默认为5年)之后,调用Streaming计算各领域在该时间段内每个月产出的论文数,并以从开始时刻至当前月份的论文总数作为该领域热度的排名,每个月份返回热度最高的前10个领域,使用动态排名展示结果,相关系统的运行截图如下图左半部分所示。
    各领域的动态排名反映了特定时间段内不同领域此消彼长的趋势,其中长期排名领先的领域则可以毫无置疑地被看作近年来的热门领域;那些逐渐离开排行榜的领域可能是因为遇到发展瓶颈、热度消退;而异军突起的新领域则有可能是在近年来有了突破性的成果,带动了相关研究的深入。
  此外,为了体现数据流的实时性,我们使用Streaming监听特定文件夹以读取新的数据。初始状态下所有的论文数据都没有被读取,在用户输入查找的时间段之后,系统会查找该时间段没有被读取过的论文数据,将其移至该文件夹以供Streaming读取,并记录相应时间段的数据已被读取。在这个过程中,前端会定时更新数据,显示已被Streaming读取的数据总量,以体现数据流读取的过程。用户随时可以访问一段时间内的动态排名,但是这个排名数据是基于Streaming已经读取的数据得到的,这个结果虽然是“不完全”的,但是和Streaming已经获取到的所有数据是匹配的,这体现了Streaming实时流式计算的特点。
  针对业务问题「分析特定时间段内每年的最热门领域」,我们的实现是:用户选择要统计的时间段(默认为5年)之后,调用Streaming计算各领域在该时间段内每年产出的论文数,从而计算得到每年热度最高的领域,使用柱状图展示结果,相关系统的运行截图如上图右半部分所示。
  如果说动态排名以增量化的方式展示了各个领域的热度对比,那么柱状图则关注每年热门领域的变化,希望形成对于热门领域变迁的总体印象。
  GraphX

  I. GraphX应用场景

  在Streaming应用场景中,我们基于各个领域的论文数比较了不同领域的热门程度。除此之外,我们也希望关注各个领域之间的关系,了解某个领域可以和哪些领域产交叉;在某一个领域内部我们则希望寻找紧密合作的学术圈子,以及探寻作者之间的合作关系。在这个过程中,我们可以在和自己领域交叉的领域中发现新的创新点,以及找到和别的作者的合作路径,进而取得更加优秀的学术成果。
  首先我们进行了以下的这些假设。
  

  • 我们假设两个领域的交叉是指有论文同属于两个不同的领域
  • 一个领域能够和更多的领域交叉,意味着它包含更多等待解决的问题,有更好的发展前景
  • 我们假设两个作者的合作是指他们曾经合作发表过论文
  我们具体要解决的问题如下:
  

  • 统计特定时间段内各个领域和其他领域的交叉程度,并找出交叉程度高的领域
  • 统计特定时间段内和某领域交叉程度较高的领域
  • 探究某领域内作者间的合作情况,寻找各个作者所活跃的课题小组
  • 计算两个作者关系可以通过哪些合作者产生合作,探寻两个作者之间可能的合作关系
  II. GraphX数据获取

  同[Streaming数据获取](# II. Streaming数据获取)。
  III. 图计算总体架构

  3.1 图构造过程

  我们主要构造以下两种图用于计算:
  

  • 领域的共同论文构成的图

    • 以每个领域为点,同时属于两个领域的论文数为边,构成一张无向图。
    • 用于进行领域共同论文数的统计和领域交叉度的排名计算。

  • 作者的共同论文构成的图

    • 以每个作者为点,同时属于两个作者(不论一作、二作还是其他作者)的论文的数量为边,构成一张无向图。
    • 用于进行作者论文数的统计、作者分类的计算以及作者间合作关系的搜索。
    • 每张图限制在单个领域内,以限制数据量。

  图的构造遵循GraphX的框架实现,主要有以下三个步骤:
  

  • 构造点的集合
  • 构造边的集合
  • 用点集和边集生成图
  不难发现两种图在构造上有相似之处,这是因为领域和论文、作者和论文都是多对多关系。因此我们构造了一个GraphHelper对象来封装以上三个步骤的实现过程:
  1. object GraphHelper {
  2.   def getVertices(df: DataFrame, colName: String): DataFrame = {
  3.       // 详见下文
  4.   }
  5.   def getEdges(df: DataFrame, vertices: DataFrame, colName: String, minCount: Int): DataFrame = {
  6.     getEdges(df, vertices, colName, minCount, "idx_1", "idx_2")
  7.   }
  8.   def getEdges(df: DataFrame, vertices: DataFrame, colName: String, minCount: Int, idColName1: String,
  9.                idColName2: String): DataFrame = {
  10.       // ...
  11.   }
  12.   def loadGraph(vertices: DataFrame, edges: DataFrame): Graph[Long, Long] = {
  13.       // ...
  14.   }
  15. }
复制代码
于是在各个计算主流程中,我们只需要按顺序调用这三个方法即可:
  1. // 读取数据文件
  2. val df = Utils.loadData(sparkSession, startYear).cache()
  3. // 获取点集,统计出所有领域作为点并编号
  4. val vertices = GraphHelper.getVertices(df, "subject").cache()
  5. // 获取边集,注意这里是有向边的集合,原本一条无向边在这由两条有向边代替
  6. val edges = GraphHelper.getEdges(df, vertices, "subject", 0).cache()
  7. // 生成图
  8. val graph = GraphHelper.loadGraph(vertices, edges)
复制代码
由于点集vertices和边集edges在之后的代码逻辑中仍然有用,因此不再做进一步的封装。
  下面分别介绍三个步骤的具体实现。
  3.1.1 构造点的集合

  1. object GraphHelper {
  2.   def getVertices(df: DataFrame, colName: String): DataFrame = {
  3.     df.withColumn(colName, explode(col(colName + 's')))
  4.       .select(colName)
  5.       .groupBy(colName)
  6.       .count()
  7.       .withColumn("id", monotonically_increasing_id())
  8.       .select(col("id"), col(colName), col("count").alias("paper_count"))
  9.   }
  10. }
复制代码
df是论文数据,每行记录是一篇论文(下同)。以作者图(calName="author")为例,首先通过explode()函数按作者进行一个行展开操作,然后group by作者并顺便统计论文数,即得到作者的集合。
  3.1.2 构造边的集合

  1. object GraphHelper {
  2.   def getEdges(df: DataFrame, vertices: DataFrame, colName: String, minCount: Int): DataFrame = {
  3.     getEdges(df, vertices, colName, minCount, "idx_1", "idx_2")
  4.   }
  5.   def getEdges(df: DataFrame, vertices: DataFrame, colName: String, minCount: Int, idColName1: String,
  6.                idColName2: String): DataFrame = {
  7.     val colName1 = colName + '1'
  8.     val colName2 = colName + '2'
  9.     // 统计出指定列间的边的值:出现在同一论文中的次数
  10.     val exploded = df.withColumn(colName1, explode(col(colName + 's')))
  11.       .select(col("title"), col(colName1))
  12.       .cache()
  13.     val tmp1 = vertices.join(exploded, vertices(colName) === exploded(colName1))
  14.       .select(col(colName1), col("title"))
  15.     val tmp2 = vertices.join(exploded, vertices(colName) === exploded(colName1))
  16.       .select(col(colName).as(colName2), col("title"))
  17.     var counts = tmp1.join(tmp2, tmp1("title") === tmp2("title") && tmp1(colName1) =!= tmp2(colName2))
  18.       .groupBy(colName1, colName2)
  19.       .count()
  20.     if (minCount > 0) {
  21.       counts = counts.filter(col("count") > minCount)
  22.     }
  23.     counts.join(vertices, vertices(colName) === counts(colName1))
  24.       .selectExpr(colName1, colName2, "count", "id as " + idColName1)
  25.       .join(vertices, vertices(colName) === counts(colName2))
  26.       .selectExpr(colName1, colName2, "count", idColName1, "id as " + idColName2)
  27.   }
  28. }
复制代码
以作者图(calName="author")为例,同样通过explode()函数按作者行展开,然后join点集vertices以减少数据量。接着如代码中counts变量的首条赋值语句所示,通过join自身来得到边的集合,然后group by边的起点和终点以聚合相同的边。有些计算场景只需要超过一定次数的边,因此需要在此处根据传入的minCount进行过滤。最后join点集来拼接点的ID数据。
  3.1.3 用点集和边集生成图

  1. object GraphHelper {
  2.   def loadGraph(vertices: DataFrame, edges: DataFrame): Graph[Long, Long] = {
  3.     val points = vertices.rdd.map(row => (row.getLong(0), row.getLong(2)))
  4.     val edgesRdd = edges
  5.       .rdd
  6.       .map(row => {
  7.         Edge[Long](row.getLong(3), row.getLong(4), row.getLong(2))
  8.       })
  9.     // 建图
  10.     Graph(points, edgesRdd)
  11.   }
  12. }
复制代码
vertices和edges分别是处理好的点集和边集,GraphX对点和边的ID数据和权重数据有位置等格式要求,因此此处通过map()函数进行格式转换,最后调用org.apache.spark.graphx.Graph()函数即可构建出图。另外,我们的代码到此时都只是对DataFrame等spark数据结构的映射操作,并未真正要求过某个步骤的计算结果,因此spark得以利用其惰性计算的特性按兵不动,在最后执行时统一进行更为彻底和全面的优化。
  3.2 基于图的计算过程

  3.2.1 领域交叉度排名计算

  我们基于领域图,使用PageRank算法对领域交叉程度进行计算。GraphX框架提供了PageRank算法的实现,我们直接调用即可得到各交叉度的得分:
  1. val ranks = graph.pageRank(0.0001).vertices
复制代码
然后通过join组装上起始年份和领域名称等信息,最后写入MySQL保存。数据表结构如下:
  1. create table subject_crossover_rank
  2. (
  3.     start_year     int          not null,
  4.     subject        varchar(128) not null,
  5.     crossover_rank double       not null,
  6.     primary key (start_year, subject)
  7. );
复制代码
我们用于计算的论文的发表时间范围是从起始年份start_year至今。我们对不同的起始年份分别算出一份结果,以此展现每年的变化。下面的所有图计算同样如此。
  3.2.2 领域共同论文数统计

  本质上就是把用于计算PageRank的边集连同边的权重一起保存下来,数据处理和组装过程不再赘述。最后存入的数据表结构如下:
  1. create table relative_subjects
  2. (
  3.     start_year  int          not null,
  4.     subject1    varchar(128) not null,
  5.     subject2    varchar(128) not null,
  6.     paper_count int          not null,
  7.     primary key (start_year, subject1, subject2)
  8. );
复制代码
3.2.3 作者合作簇计算

  我们基于作者图,使用ConnectedComponents算法对活跃作者的合作关系进行聚簇计算。由于数据量过大,我们的运算资源和前端渲染能力都有限,因此我们增加了领域维度(即对每个领域内的论文都生成一张图并进行计算),并且只关注发表论文数最多的前100名作者。GraphX框架提供了ConnectedComponents算法的实现,我们直接调用即可得到作者的分类结果:
  1. val cc = graph.connectedComponents().vertices
复制代码
然后通过join组装上起始年份和领域名称等信息,最后写入MySQL保存。数据表结构如下:
  1. create table author_connections_author
  2. (
  3.     start_year      int          not null,
  4.     subject         varchar(128) not null,
  5.     author_id       bigint       not null,
  6.     author_name     varchar(64)  not null,
  7.     paper_count     int          not null,
  8.     author_category bigint       not null,
  9.     primary key (start_year, subject, author_id)
  10. );
复制代码
3.2.4 作者合作情况统计

  本质上就是把上个计算中活跃作者间的边保存下来。不同于ConnectedComponents算法运行时只需要权重大于等于3的边,此处我们会保存所有权重大于0的边。最后存入的数据表结构如下:
  1. create table collaborations
  2. (
  3.     id            bigint         auto_increment primary key,
  4.     start_year    int            not null,
  5.     subject       varchar(128)   not null,
  6.     source_author bigint         not null,
  7.     target_author bigint         not null,
  8.     primary key id
  9. );
复制代码
3.2.5 作者合作关系搜索

  对于任意两个作者,我们希望找到他们在作者图中的所有连通路径,最大长度限制为4(共经过4条边)。我们引入第三方库graphframe,使用它的图GraphFrame提供的广度优先搜索最短路径功能进行搜索:
  1. val paths = graphFrame.bfs
  2.         .fromExpr(col("author") === author1)
  3.         .toExpr(col("author") === author2)
  4.         .maxPathLength(5)
  5.         .run()
复制代码
它的每次搜索会返回最短路径的数组(因为可能有多条长度相同的边符合条件)。出来的边如果没有达到长度限制,则在图中删掉所有路径经过的点以及相关的边,然后再次进行搜索:
  1. graphFrame = graphFrame.filterVertices(!col("id").isin(idList:_*))
复制代码
迭代式搜索的结果会存入数据库,相应的表结构如下:
  1. create table collaboration_path
  2. (
  3.     id            bigint auto_increment
  4.         primary key,
  5.     start_year    int           not null,
  6.     subject       varchar(128)  not null,
  7.     source_author varchar(64)   not null,
  8.     target_author varchar(64)   not null,
  9.     path          varchar(1024) null comment 'data example=author1,author2,author3'
  10. );
复制代码
3.3 困难和解决方案

  

  • 数据量过大导致运算时间过长。

    • 这是最大的困难,迫使我们多次修改实现方式,以及做出妥协。一开始我们希望统计某领域内所有作者的合作情况并为他们分组,但spark任务运行时间过长导致我们无法在做最终展示前准备好所有数据,数量超过1万的作者也让前端展示的关系图变成了一团乱麻。
    • 解决方案:妥协,只计算单个领域内发表论文数前100的作者的合作簇,以及统计他们的合作情况。

  • 运算结果数据过大以至于无法预先算好并存入MySQL数据库。

    • 对于作者间接合作关系搜索,我们最初的想法是像其他图的计算一样,预先算好结果存入MySQL数据库,让后端直接从中读取结果。但初步实现并测试后发现,这意味着对于m篇有n个作者的论文,我们就有                                                   m                                  ×                                               ∑                                                   i                                        =                                        2                                                                m                                        i                                        n                                        (                                        n                                        ,                                        5                                        )                                                                        A                                     n                                     i                                                      m×\sum_{i=2}^{min(n,5)}A^{i}_n                           m×∑i=2min(n,5)​Ani​条符合条件的边要存!简单计算一下就知道,这意味着仅仅对于10万篇有4个作者(这应该很常见)的论文,我们就要存下共计600万(                                                  100000                                  ×                                  (                                  12                                  +                                  24                                  +                                  24                                  )                                          100000×(12+24+24)                           100000×(12+24+24))条边,而这还不算那些写了多篇论文的作者的额外贡献(这部分应该会更多)!我们及时地在MySQL存了240万条边后停了下来,并为MySQL的坚挺感到庆幸。
    • 解决方案:最终我们选择不再采取预先全部算好的方式,转为在用户请求搜索两个作者间的关系时实时启动spark,只以这两个作者分别作为起点和终点进行计算,结果通过存入MySQL的方式传递给后端。

  • 操作DataFrame时不容易弄清楚返回结果的结构。

    • 与RDD不同,DataFrame无法在源码阶段推断出表头的结构,IDE对此无能为力,为代码的编写和纠错带来不小的困难。
    • 解决方案:通过spark-shell一点点进行单步调试。

  IV. 业务需求实现

  4.1 领域交叉统计

  针对业务问题「统计特定时间段内各个领域和其他领域的交叉程度,并找出交叉程度高的领域」,我们的实现是:用户选择要统计的时间段(默认为5年)之后,调用pagerank算法计算各领域的权重值,并按照该权重值作为该领域和其他领域交叉程度的标准进行排名,返回交叉程度最高的前20个领域,并用表格来展示,相关系统的运行截图如下所示。
    pagerank算法是谷歌公司用户计算网页重要程度的算法,一个网页的权重除了常数之外,只依赖于所有包含了该网页链接的网页的权重值,通过迭代式更新所有网页的权重可以得到较稳定的结果。在pagerank算法中,那些被众多网页所链接的网页具有更高的权重值,而被具有较高权重值的网页所链接的网页也能够获得较高的权重值。
  类似地,我们可以将这个算法应用到科研领域间的关系上,网页的链接可以理解成科研领域的交叉,只不过网页链接是单向的而领域间的交叉是双向的。那些权重值较高的领域,往往和许多领域产生交叉,这为该领域的研究提供了大量的可供探索的空间和亟待解决的问题,该领域拥有广阔的发展前景;和高权重领域产生交叉的领域,意味着该领域本身的研究目标具有一定的价值,可以和交叉程度高的领域产生一定程度的共鸣。通过这种方法,我们了解到了可以和多领域交叉复合的领域,相关领域的工作往往拥有更高的应用价值。
  4.2 相关领域排名

  针对业务问题「统计特定时间段内和某领域交叉程度较高的领域」,我们的实现是:根据用户输入的统计年份数以及在表格中选择的领域信息,我们调用GraphX计算该领域在该时间段内交叉程度最高的10个领域,相关系统的运行截图如下所示。
    “领域交叉统计”按照pagerank权重展示了交叉复合程度高的领域,而本业务需求则是立足于某一特定领域,寻找与关联密切的领域。排名的依据是同属于两个领域的论文数量,数量越多意味着两个领域交叉复合越紧密。在“领域交叉统计”业务需求中没有按照论文数据计算的原因在于,在讨论所有领域的情况下,pagerank能够考虑到关联领域权重对于领域自身权重的影响,突出和“中心领域”关联密切的领域,而单纯依赖论文数量的统计结果不能将相关领域的权重带入本领域权重的计算;但是从特定领域出发寻找相关领域,关注的是和此领域的关联性,所以用跨领域的论文数作为统计依据更加合适。
  通过查找和当前领域交叉复合程度高的领域,可以为特定领域的研究人员提供可能的研究方向,也能够进一步促进研究人员之间的跨领域合作,找到新的值得关注的问题。
  4.3 领域内合作情况

  针对业务问题「探究某领域内作者间的合作情况,寻找各个作者所活跃的课题小组」,我们的实现是:根据用户输入的统计年份数以及在表格中选择的领域信息,我们调用GraphX计算该领域在该时间段内不同作者的分组情况,使用关系图展示结果,每一个节点代表一名作者,节点的大小代表作者的合作者数量,同一颜色的节点表示对应的作者合作关系较为紧密。相关系统的运行截图如下所示。
    该业务需求的目的是在特定领域构建作者的人际关系图,探寻作者间的合作关系,希望了解和某个作者合作紧密的人,以及通过他可以接触到的人(这些人通常和该作者属于同一课题组)。这么做的理由在于:同一个课题组的人会一起发论文,构成一个聚类簇;不同机构的人不会一起发论文,他们就会被分开。
  于是可以使用connectedComponents方法,将图划分成几个连通区域,对临时聚类簇进行合并。因为不同课题组之间可能有交流访问等情况,为了避免“偶然的合作”,在计算聚类簇的时候,只有合作发论文超过3篇的人才真正放在同一聚类簇里。这种做法是基于一个有趣的事实:只合作过1篇论文及以下的情况占据了所有合作情况的90%,而超过3篇论文的“紧密合作”则只占2%。
  通过限制合作数量,能够更好地将作者划分至不同的聚类簇中,了解作者间的合作情况。值得注意的是,在结果展示的时候,不超过3篇论文的“松散合作”仍然会被记入作者的合作者数量中。
  4.4 合作关系探索

  针对业务问题「计算两个作者关系可以通过哪些合作者产生合作,探寻两个作者之间可能的合作关系」,我们的实现是:根据用户输入的统计年份数以及在表格中选择的领域信息,我们调用GraphX计算该领域在该时间段内两名作者可能的合作路径,每条合作路径上两两相邻的作者共同合作发表过论文。相关系统的运行截图如下所示。
    通过合作路径可以清晰直观地展示两名作者间的人际关系,探寻在未来展开合作的可能。例如Yu-Shen Liu希望和在领域内颇具影响力的Yilin Shen合作发表论文,于是可以通过与两人都展开过合作的Yujun Shen的介绍与Yilin Shen建立联系。在这里的合作是一个双向的关系,可以从Yu-Shen Liu出发找到Yilin Shen,也可以从Yilin Shen出发找到Yu-Shen Liu。
  合作者探索的搜索过程中使用了GraphX的广度优先搜索(bfs)算法,为了避免资源的消耗,需要设置最大路径长度。在系统中,该路径长度被设定为4,也就是说除了查找的两名作者之外,最多只能有3个中间人,这么做的理由是过长的合作路径会导致较大的沟通成本,无法展开有效的合作。

来源:https://blog.caogenba.net/newlw/article/details/122442185
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x

帖子地址: 

回复

使用道具 举报

分享
推广
火星云矿 | 预约S19Pro,享500抵1000!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

草根技术分享(草根吧)是全球知名中文IT技术交流平台,创建于2021年,包含原创博客、精品问答、职业培训、技术社区、资源下载等产品服务,提供原创、优质、完整内容的专业IT技术开发社区。
  • 官方手机版

  • 微信公众号

  • 商务合作