{"id":1457,"date":"2022-06-16T20:51:05","date_gmt":"2022-06-16T12:51:05","guid":{"rendered":"http:\/\/www.eait.co\/?p=1457"},"modified":"2022-06-16T22:12:46","modified_gmt":"2022-06-16T14:12:46","slug":"scala-%e6%97%a5%e5%bf%97%e7%bb%9f%e8%ae%a1%e4%bb%a3%e7%a0%81-%e6%95%b0%e6%8d%ae%e5%ba%93%e8%bf%9e%e6%8e%a5","status":"publish","type":"post","link":"https:\/\/notes.coremix.net\/?p=1457","title":{"rendered":"scala \u65e5\u5fd7\u7edf\u8ba1\u4ee3\u7801 \u81ea\u5b9a\u4e49\u5206\u533a \u6570\u636e\u5e93\u8fde\u63a5"},"content":{"rendered":"<p>&nbsp;<\/p>\n<pre class=\"brush: scala; title: ; notranslate\" title=\"\">\r\npackage customPartition0103\r\n\r\npackage customPartition0103\r\n\r\nimport org.apache.spark.rdd.RDD\r\nimport org.apache.spark.{SparkConf, SparkContext}\r\n\r\nobject getLogCount extends App {\r\n  \/\/\u83b7\u53d6spark\u7684sc\r\n  var conf = new SparkConf().setAppName(&quot;Logcount&quot;).setMaster(&quot;local&quot;)\r\n  var sc = new SparkContext(conf)\r\n  \/\/ \u8bfb\u53d6\u6587\u4ef6\uff1a\r\n  private val linerdd= sc.textFile(&quot;C:\\\\Users\\\\xabcd\\\\Desktop\\\\localhost_access_log.txt&quot;)\r\n\r\n  val rdd1 = linerdd.map(line=&gt;{\r\n    \/\/ \u62ff\u4e24\u4e2a\u5f15\u53f7\u4e4b\u95f4\u7684\u6570\u636e\r\n    val index1 = line.indexOf(&quot;\\&quot;&quot;)\r\n    val index22 = line.lastIndexOf(&quot;\\&quot;&quot;)\r\n    val line1 = line.substring(index1+1,index22)\r\n\r\n    \/\/\u83b7\u53d6\u4e24\u4e2a\u7a7a\u683c\u4e4b\u95f4\u7684\u6570\u636e\r\n    val index3 = line1.indexOf(&quot; &quot;)\r\n    val index4 = line1.lastIndexOf(&quot; &quot;)\r\n    val line2 = line1.substring(index3+1,index4)\r\n\r\n    \/\/\u83b7\u53d6jsp\u7684\u540d\u5b57\r\n    val name = line2.substring(line2.indexOf(&quot;\/&quot;)+1)\r\n    \/\/\u4e4b\u6240\u4ee5\u8981\u7528\u8fd9\u79cd\u65b9\u6cd5\uff0c\u56e0\u4e3a\u6bd4split\u66f4\u52a0\u8282\u7ea6\u5185\u5b58\u4e5f\u63d0\u9ad8\u901f\u5ea6\uff08\u7b97\u6cd5\u601d\u60f3\uff09\r\n    (name,1)\r\n\r\n  })\r\n  \/\/\u805a\u5408\r\n  private val rdd2: RDD&#x5B;(String, Int)] = rdd1.reduceByKey(_+_)\r\n  private val result: RDD&#x5B;(String, Int)] = rdd2.sortBy(_._2,false)\r\n  \/\/\u6253\u5370\r\n  result.foreach(println)\r\n  sc.stop()\r\n}\r\n<\/pre>\n<p>&nbsp;<\/p>\n<p><strong>\u00a0\u81ea\u5b9a\u4e49\u5206\u533a\uff1a<\/strong><\/p>\n<p>&nbsp;<\/p>\n<pre class=\"brush: scala; title: ; notranslate\" title=\"\">\r\npackage customPartition0103\r\n\r\nimport org.apache.spark.rdd.RDD\r\nimport org.apache.spark.{Partitioner, SparkConf, SparkContext}\r\n\r\nimport scala.collection.mutable\r\n\r\n\/**\r\n * \u81ea\u5b9a\u4e49\u5206\u533a\r\n * \u6309\u7167\u7f51\u9875\u7684\u540d\u5b57\u6765\u5206\u533a\r\n * *\/\r\nobject selfPartition {\r\n  def main(args: Array&#x5B;String]): Unit = {\r\n  \/\/\u83b7\u53d6spark\u7684sc\r\n  var conf = new SparkConf().setAppName(&quot;Logcount&quot;).setMaster(&quot;local&quot;)\r\n\/\/  var conf = new SparkConf().setAppName(&quot;Logcount&quot;).setMaster(&quot;spark:\/\/bigdata166:7077&quot;)\r\n  var sc = new SparkContext(conf)\r\n  \/\/ \u8bfb\u53d6\u6587\u4ef6\uff1a\r\n  val linerdd= sc.textFile(&quot;C:\\\\Users\\\\xabcd\\\\Desktop\\\\localhost_access_log.txt&quot;)\r\n\/\/  private val linerdd= sc.textFile(&quot;\/testdata\/localhost_access_log.txt&quot;) \/\/\u6d4b\u8bd5\u5931\u8d25\uff0c\u53ef\u80fd\u4e9a\u6253jar\u5305\r\n\r\n  val rdd1 = linerdd.map(line=&gt;{\r\n    \/\/ \u62ff\u4e24\u4e2a\u5f15\u53f7\u4e4b\u95f4\u7684\u6570\u636e\r\n    val index1 = line.indexOf(&quot;\\&quot;&quot;)\r\n    val index22 = line.lastIndexOf(&quot;\\&quot;&quot;)\r\n    val line1 = line.substring(index1+1,index22)\r\n\r\n    \/\/\u83b7\u53d6\u4e24\u4e2a\u7a7a\u683c\u4e4b\u95f4\u7684\u6570\u636e\r\n    val index3 = line1.indexOf(&quot; &quot;)\r\n    val index4 = line1.lastIndexOf(&quot; &quot;)\r\n    val line2 = line1.substring(index3+1,index4)\r\n\r\n    \/\/\u83b7\u53d6jsp\u7684\u540d\u5b57\r\n    val name = line2.substring(line2.indexOf(&quot;\/&quot;)+1)\r\n    \/\/\u4e4b\u6240\u4ee5\u8981\u7528\u8fd9\u79cd\u65b9\u6cd5\uff0c\u56e0\u4e3a\u6bd4split\u66f4\u52a0\u8282\u7ea6\u5185\u5b58\u4e5f\u63d0\u9ad8\u901f\u5ea6\uff08\u7b97\u6cd5\u601d\u60f3\uff09\r\n    (name,1)\r\n\r\n  })\r\n  \/\/\u6570\u636e\u8f6c\u6362rdd\u8f6c\u6362\u4e3a\u6570\u7ec4:jsp\u540d\u5b57\u7684\u6570\u7ec4\r\n  val jspList = rdd1.map(_._1).distinct().collect()\r\n  \/\/\u81ea\u5b9a\u4e49\u5206\u533a\uff1a\u5206\u533a\u89c4\u5219\uff0c\u9700\u8981\u4e00\u4e2a\u65b0\u7684\u7c7b\r\n  var myPartition = new MyPartition(jspList)\r\n\r\n  var rdd3 = rdd1.partitionBy(myPartition)\r\n  \/\/\u8f93\u51fa\r\n  rdd3.saveAsTextFile(&quot;D:\\\\testdata\\\\streaming\\\\TomcatLogPartition&quot;)\r\n  sc.stop()}\r\n}\r\n\r\n\r\n\r\nclass MyPartition(jspList:Array&#x5B;String]) extends Partitioner{\r\n  \/\/\u5b9a\u4e49\u4e00\u4e2a\u4fdd\u5b58\u5206\u533a\u7684\u6761\u4ef6\u7684\u96c6\u5408\r\n  val partitionMap = new mutable.HashMap&#x5B;String,Int]()\r\n  \/\/\u5206\u533a\u53f7\r\n  var indexId = 0\r\n\r\n  for(jsp &lt;- jspList){\r\n    partitionMap.put(jsp,indexId)\r\n    indexId+=1\r\n  }\r\n  \/\/\u8fd4\u56de\u5206\u533a\u4e2a\u6570\r\n  override def numPartitions: Int = partitionMap.size\r\n  \/\/\u6839\u636e\u540d\u5b57\uff0c\u8fd4\u56de\u5bf9\u5e94\u7684\u5206\u533a\r\n  override def getPartition(key: Any): Int = {\r\n    partitionMap.getOrElse(key.toString,0)\r\n  }\r\n}\r\n<\/pre>\n<p>&nbsp;<\/p>\n<p><strong>\u6570\u636e\u5e93\u5199\u5165\uff1a<\/strong><br \/>\n&nbsp;<\/p>\n<pre class=\"brush: scala; title: ; notranslate\" title=\"\">\r\npackage customPartition0103\r\n\r\nimport java.sql.{Connection, DriverManager, PreparedStatement}\r\n\r\nimport org.apache.spark.{SparkConf, SparkContext}\r\n\r\n\/**\r\n * \u5c06\u6570\u636e\u5199\u8fdbMySQL\u4e2d\r\n * \u5148\u53bbMySQL\u4e2d\u521b\u5efa\u8868\r\n * *\/\r\nobject writeTomysql{\r\n  var conn: Connection = null\r\n  var ppsm:PreparedStatement = null\r\n\r\n  def main(args: Array&#x5B;String]): Unit = {\r\n    \/\/\u83b7\u53d6spark\u7684\u73af\u5883\r\n    \/\/\u83b7\u53d6spark\u7684sc\r\n    var conf = new SparkConf().setAppName(&quot;count&quot;).setMaster(&quot;local&quot;)\r\n    var sc = new SparkContext(conf)\r\n\r\n    \/\/\u89e3\u6790\u6570\u636e\r\n    \/\/1.\u8bfb\u53d6\u6587\u4ef6\r\n    var linerdd = sc.textFile(&quot;C:\\\\Users\\\\xabcd\\\\Desktop\\\\localhost_access_log.txt&quot;)\r\n\r\n    \/\/2.\u89e3\u6790\u65e5\u5fd7\uff1a\u7f51\u9875\u540d\u79f0\r\n    \/**\r\n     * 192.168.88.1 - - &#x5B;30\/Jul\/2017:12:53:43 +0800] &quot;GET \/MyDemoWeb\/head.jsp HTTP\/1.1&quot; 200 713\r\n     * \u7f51\u9875\u540d\u79f0:MyDemoWeb\/head.jsp\r\n     * *\/\r\n    var rdd1 = linerdd.map(line =&gt;{\r\n      \/\/1.\u6216\u4e24\u4e2a\u5f15\u53f7\u4e4b\u95f4\u7684\u6570\u636e\r\n      var index1 = line.indexOf(&quot;\\&quot;&quot;)\r\n      var index2 = line.lastIndexOf(&quot;\\&quot;&quot;)\r\n      var line1 = line.substring(index1+1,index2)\/\/  GET \/MyDemoWeb\/head.jsp HTTP\/1.1\r\n\r\n      \/\/2.\u83b7\u53d6\u4e24\u4e2a\u7a7a\u683c\u4e4b\u95f4\u7684\u6570\u636e\r\n      var index3 = line1.indexOf(&quot; &quot;)\r\n      var index4 = line1.lastIndexOf(&quot; &quot;)\r\n      var line2 = line1.substring(index3+1,index4)\/\/ \/MyDemoWeb\/head.jsp\r\n      \/\/3.\u83b7\u53d6jsp\u7684\u540d\u5b57\r\n      var name = line2.substring(line2.indexOf(&quot;\/&quot;)+1)\r\n\r\n      (name,1)\r\n\r\n    }\r\n    )\r\n\r\n    \/\/3.\u805a\u5408\r\n\r\n    var rdd2 = rdd1.reduceByKey(_+_)\r\n    \/\/4.\u6392\u5e8f\uff0c\u8bbf\u95ee\u91cf\u964d\u5e8f\r\n    var result = rdd2.sortBy(_._2,false)\r\n    \/\/\u521b\u5efaMySQL\u94fe\u63a5,\r\n    \/\/\u95ee\u9898\uff1a\u4e00\u6761\u6570\u636e\u94fe\u63a5\u4e00\u4e2aMySQL\uff0c\u5bf9MySQL\u7684\u538b\u529b\u4f1a\u6bd4\u8f83\u5927\r\n    \/*result.foreach(\r\n      t =&gt;{\r\n        \/\/1.\u83b7\u53d6MySQL\u94fe\u63a5\r\n        conn = DriverManager.getConnection(&quot;jdbc:mysql:\/\/192.168.2.111:3306\/company?serverTimezone=UTC&amp;characterEncoding=utf-8&quot;,&quot;root&quot;,&quot;000000&quot;)\r\n        ppsm = conn.prepareStatement(&quot;insert into mydata1 values(?,?)&quot;)\r\n        \/\/\u5f80MySQL\u5199\u6570\u636e\r\n        ppsm.setString(1,t._1)\r\n        ppsm.setInt(2,t._2)\r\n        ppsm.executeUpdate()\r\n      }\r\n    )*\/\r\n\r\n    \/\/\u901a\u8fc7\u5206\u533a\u6765\u907f\u514d\r\n    result.foreachPartition(myconn)\r\n    sc.stop()\r\n  }\r\n\r\n\r\n  def myconn(t:Iterator&#x5B;(String,Int)]){\r\n    \/\/\u5148\u83b7\u53d6\u94fe\u63a5\r\n    try{\r\n      conn = DriverManager.getConnection(&quot;jdbc:mysql:\/\/192.168.43.166:3306\/company?serverTimezone=UTC&amp;characterEncoding=utf-8&quot;,&quot;root&quot;,&quot;000000&quot;)\r\n      ppsm = conn.prepareStatement(&quot;insert into logcount0103 values(?,?)&quot;)\r\n      t.foreach(\r\n        it=&gt;{\r\n          ppsm.setString(1,it._1)\r\n          ppsm.setInt(2,it._2)\r\n          ppsm.executeUpdate()\r\n        }\r\n      )\r\n    }catch {\r\n      case t:Throwable =&gt; t.printStackTrace()\r\n    }finally {\r\n      if(ppsm != null) ppsm.close()\r\n      if(conn != null) conn.close()\r\n    }\r\n\r\n  }\r\n}\r\n\r\n<\/pre>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n","protected":false},"excerpt":{"rendered":"<p>&nbsp; package customPartition0103 package customPartit [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-1457","post","type-post","status-publish","format-standard","hentry","category-uncategorized"],"blocksy_meta":[],"_links":{"self":[{"href":"https:\/\/notes.coremix.net\/index.php?rest_route=\/wp\/v2\/posts\/1457","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/notes.coremix.net\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/notes.coremix.net\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/notes.coremix.net\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/notes.coremix.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=1457"}],"version-history":[{"count":4,"href":"https:\/\/notes.coremix.net\/index.php?rest_route=\/wp\/v2\/posts\/1457\/revisions"}],"predecessor-version":[{"id":1461,"href":"https:\/\/notes.coremix.net\/index.php?rest_route=\/wp\/v2\/posts\/1457\/revisions\/1461"}],"wp:attachment":[{"href":"https:\/\/notes.coremix.net\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=1457"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/notes.coremix.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=1457"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/notes.coremix.net\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=1457"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}