{"id":1306,"date":"2022-03-02T23:13:38","date_gmt":"2022-03-02T15:13:38","guid":{"rendered":"http:\/\/www.eait.co\/?p=1306"},"modified":"2022-03-03T10:36:15","modified_gmt":"2022-03-03T02:36:15","slug":"flume%e5%90%84%e7%a7%8d%e6%8b%a6%e6%88%aa%e5%99%a8%ef%bc%88%e4%ba%8c%ef%bc%89-%e6%ad%a3%e5%88%99%e5%92%8c%e8%87%aa%e5%ae%9a%e4%b9%89%e6%8b%a6%e6%88%aa%e5%99%a8","status":"publish","type":"post","link":"https:\/\/notes.coremix.net\/?p=1306","title":{"rendered":"Flume\u5404\u79cd\u62e6\u622a\u5668\uff08\u4e8c\uff09\u2014\u2014\u6b63\u5219\u548c\u81ea\u5b9a\u4e49\u62e6\u622a\u5668"},"content":{"rendered":"<h4><span style=\"font-family: \u5b8b\u4f53;\">\u67e5\u8be2\u66ff\u6362\u62e6\u622a\u5668<\/span><\/h4>\n<p>&nbsp;<\/p>\n<p>search.conf<\/p>\n<pre class=\"brush: bash; title: ; notranslate\" title=\"\">\r\n#1 agent\r\na1.sources = r1\r\na1.sinks = k1\r\na1.channels = c1\r\n\r\n#2 source\r\na1.sources.r1.type = exec\r\na1.sources.r1.channels = c1\r\na1.sources.r1.command = tail -F \/opt\/Shalltest\r\na1.sources.r1.interceptors = i1\r\na1.sources.r1.interceptors.i1.type = search_replace\r\n\r\n#\u9047\u5230\u6570\u5b57\u6539\u6210shalltest\uff0cA123\u4f1a\u66ff\u6362\u4e3aAshalltest\r\na1.sources.r1.interceptors.i1.searchPattern = &#x5B;0-9]+\r\na1.sources.r1.interceptors.i1.replaceString = shalltest\r\na1.sources.r1.interceptors.i1.charset = UTF-8\r\n\r\n#3 sink\r\na1.sinks.k1.type = logger\r\n\r\n#4 Chanel\r\na1.channels.c1.type = memory\r\na1.channels.c1.capacity = 1000\r\na1.channels.c1.transactionCapacity = 100\r\n\r\n#5 bind\r\na1.sources.r1.channels = c1\r\na1.sinks.k1.channel = c1\r\n<\/pre>\n<pre class=\"brush: bash; title: ; notranslate\" title=\"\">\r\nbin\/flume-ng agent -c conf\/ -f jobconf\/search.conf -n a1 -Dflume.root.logger=INFO,console\r\n<\/pre>\n<h4><span style=\"font-family: \u5b8b\u4f53;\">\u6b63\u5219\u8fc7\u6ee4\u62e6\u622a\u5668<\/span><\/h4>\n<p>filter.conf<\/p>\n<pre class=\"brush: bash; title: ; notranslate\" title=\"\">\r\n#1 agent\r\na1.sources = r1\r\na1.sinks = k1\r\na1.channels = c1\r\n\r\n#2 source\r\na1.sources.r1.type = exec\r\na1.sources.r1.channels = c1\r\na1.sources.r1.command = tail -F \/opt\/Shalltest\r\na1.sources.r1.interceptors = i1\r\na1.sources.r1.interceptors.i1.type = regex_filter\r\na1.sources.r1.interceptors.i1.regex = ^A.*\r\n#\u5982\u679cexcludeEvents\u8bbe\u4e3afalse,\u8868\u793a\u8fc7\u6ee4\u6389\u4e0d\u662f\u4ee5A\u5f00\u5934\u7684events\u3002\u5982\u679cexcludeEvents\u8bbe\u4e3atrue\uff0c\u5219\u8868\u793a\u8fc7\u6ee4\u6389\u4ee5A\u5f00\u5934\u7684events\u3002\r\na1.sources.r1.interceptors.i1.excludeEvents = true\r\n\r\na1.sinks.k1.type = logger\r\n\r\na1.channels.c1.type = memory\r\na1.channels.c1.capacity = 1000\r\na1.channels.c1.transactionCapacity = 100\r\n\r\na1.sources.r1.channels = c1\r\na1.sinks.k1.channel = c1\r\n\r\n<\/pre>\n<pre class=\"brush: bash; title: ; notranslate\" title=\"\">\r\nbin\/flume-ng agent -c conf\/ -f jobconf\/filter.conf -n a1 -Dflume.root.logger=INFO,console\r\n<\/pre>\n<h4><span style=\"font-family: \u5b8b\u4f53;\">\u6b63\u5219\u62bd\u53d6\u62e6\u622a\u5668<\/span><\/h4>\n<p>extractor.conf<\/p>\n<pre class=\"brush: bash; title: ; notranslate\" title=\"\">\r\n\r\n#1 agent\r\na1.sources = r1\r\na1.sinks = k1\r\na1.channels = c1\r\n\r\n#2 source\r\na1.sources.r1.type = exec\r\na1.sources.r1.channels = c1\r\na1.sources.r1.command = tail -F \/opt\/Shalltest\r\na1.sources.r1.interceptors = i1\r\na1.sources.r1.interceptors.i1.type = regex_extractor\r\na1.sources.r1.interceptors.i1.regex = hostname is (.*?) ip is (.*)\r\na1.sources.r1.interceptors.i1.serializers = s1 s2\r\na1.sources.r1.interceptors.i1.serializers.s1.name = hostname\r\na1.sources.r1.interceptors.i1.serializers.s2.name = ip\r\n\r\na1.sinks.k1.type = logger\r\n\r\na1.channels.c1.type = memory\r\na1.channels.c1.capacity = 1000\r\na1.channels.c1.transactionCapacity = 100\r\n\r\na1.sources.r1.channels = c1\r\na1.sinks.k1.channel = c1\r\n<\/pre>\n<pre class=\"brush: bash; title: ; notranslate\" title=\"\">\r\nbin\/flume-ng agent -c conf\/ -f jobconf\/extractor.conf -n a1 -Dflume.root.logger=INFO,console\r\n<\/pre>\n<h4><span style=\"font-family: \u5b8b\u4f53;\">\u81ea\u5b9a\u4e49\u62e6\u622a\u5668<\/span><\/h4>\n<p>maven\u914d\u7f6e\u6587\u4ef6<\/p>\n<pre class=\"brush: xml; title: ; notranslate\" title=\"\">\r\n&lt;dependencies&gt;\r\n        &lt;!-- flume\u6838\u5fc3\u4f9d\u8d56 --&gt;\r\n        &lt;dependency&gt;\r\n            &lt;groupId&gt;org.apache.flume&lt;\/groupId&gt;\r\n            &lt;artifactId&gt;flume-ng-core&lt;\/artifactId&gt;\r\n            &lt;version&gt;1.8.0&lt;\/version&gt;\r\n        &lt;\/dependency&gt;\r\n    &lt;\/dependencies&gt;\r\n    &lt;build&gt;\r\n    &lt;plugins&gt;\r\n    &lt;!-- \u6253\u5305\u63d2\u4ef6 --&gt;\r\n    &lt;plugin&gt;\r\n        &lt;groupId&gt;org.apache.maven.plugins&lt;\/groupId&gt;\r\n        &lt;artifactId&gt;maven-jar-plugin&lt;\/artifactId&gt;\r\n        &lt;version&gt;2.4&lt;\/version&gt;\r\n        &lt;configuration&gt;\r\n            &lt;archive&gt;\r\n                &lt;manifest&gt;\r\n                    &lt;addClasspath&gt;true&lt;\/addClasspath&gt;\r\n                    &lt;classpathPrefix&gt;lib\/&lt;\/classpathPrefix&gt;\r\n                    &lt;mainClass&gt;&lt;\/mainClass&gt;\r\n                &lt;\/manifest&gt;\r\n            &lt;\/archive&gt;\r\n        &lt;\/configuration&gt;\r\n    &lt;\/plugin&gt;\r\n    &lt;!-- \u7f16\u8bd1\u63d2\u4ef6 --&gt;\r\n    &lt;plugin&gt;\r\n    &lt;groupId&gt;org.apache.maven.plugins&lt;\/groupId&gt;\r\n        &lt;artifactId&gt;maven-compiler-plugin&lt;\/artifactId&gt;\r\n        &lt;configuration&gt;\r\n            &lt;source&gt;1.8&lt;\/source&gt;\r\n            &lt;target&gt;1.8&lt;\/target&gt;\r\n            &lt;encoding&gt;utf-8&lt;\/encoding&gt;\r\n        &lt;\/configuration&gt;\r\n    &lt;\/plugin&gt;\r\n    &lt;\/plugins&gt;\r\n    &lt;\/build&gt;\r\n<\/pre>\n<p><strong>Java\u4ee3\u7801\uff1a<\/strong><\/p>\n<pre class=\"brush: java; title: ; notranslate\" title=\"\">\r\npackage ToUpCase;\r\n\r\nimport org.apache.flume.Context;\r\nimport org.apache.flume.Event;\r\nimport org.apache.flume.interceptor.Interceptor;\r\n\r\nimport java.util.ArrayList;\r\nimport java.util.List;\r\n\r\npublic class MyInterceptor implements Interceptor {\r\n    @Override\r\n    public void initialize() {\r\n\r\n    }\r\n\r\n    @Override\r\n    public Event intercept(Event event) {\r\n        byte&#x5B;] body = event.getBody();\r\n        event.setBody(new String(body).toUpperCase().getBytes());\r\n        return event;\r\n    }\r\n\r\n    @Override\r\n    public List&lt;Event&gt; intercept(List&lt;Event&gt; list) {\r\n        List&lt;Event&gt; events = new ArrayList&lt;&gt;();\r\n        for (Event event : list) {\r\n            events.add(intercept(event));\r\n        }\r\n        return events;\r\n    }\r\n\r\n\r\n\r\n    @Override\r\n    public void close() {\r\n\r\n    }\r\n\r\n    public static class Builder implements Interceptor.Builder{\r\n\r\n        @Override\r\n        public Interceptor build() {\r\n            return new MyInterceptor();\r\n        }\r\n\r\n        @Override\r\n        public void configure(Context context) {\r\n\r\n        }\r\n    }\r\n\r\n}\r\n\r\n<\/pre>\n<p>ToUpCase.conf<\/p>\n<pre class=\"brush: bash; title: ; notranslate\" title=\"\">\r\n\r\n#1.agent \r\na1.sources = r1\r\na1.sinks =k1\r\na1.channels = c1\r\n\r\n\r\n# Describe\/configure the source\r\na1.sources.r1.type = exec\r\na1.sources.r1.command = tail -F \/opt\/Shalltest\r\na1.sources.r1.interceptors = i1\r\n#\u5168\u7c7b\u540d$Builder\r\na1.sources.r1.interceptors.i1.type = ToUpCase.MyInterceptor$Builder\r\n\r\n# Describe the sink\r\na1.sinks.k1.type = hdfs\r\na1.sinks.k1.hdfs.path = \/ToUpCase1\r\na1.sinks.k1.hdfs.filePrefix = events-\r\na1.sinks.k1.hdfs.round = true\r\na1.sinks.k1.hdfs.roundValue = 10\r\na1.sinks.k1.hdfs.roundUnit = minute\r\na1.sinks.k1.hdfs.rollInterval = 3\r\na1.sinks.k1.hdfs.rollSize = 20\r\na1.sinks.k1.hdfs.rollCount = 5\r\na1.sinks.k1.hdfs.batchSize = 1\r\na1.sinks.k1.hdfs.useLocalTimeStamp = true\r\n#\u751f\u6210\u7684\u6587\u4ef6\u7c7b\u578b\uff0c\u9ed8\u8ba4\u662f Sequencefile\uff0c\u53ef\u7528 DataStream\uff0c\u5219\u4e3a\u666e\u901a\u6587\u672c\r\na1.sinks.k1.hdfs.fileType = DataStream\r\n\r\n# Use a channel which buffers events in memory\r\na1.channels.c1.type = memory\r\na1.channels.c1.capacity = 1000\r\na1.channels.c1.transactionCapacity = 100\r\n\r\n# Bind the source and sink to the channel\r\na1.sources.r1.channels = c1\r\na1.sinks.k1.channel = c1\r\n<\/pre>\n<p>&nbsp;<\/p>\n<p>\u542f\u52a8\uff0c\u5373\u53ef\u770b\u5230HDFS\u91cc\u9762\u6709\u4e86\u6587\u4ef6\u5939\uff0c\u5e76\u5df2\u7ecf\u8f6c\u5316\u6210\u5927\u5199<\/p>\n<pre class=\"brush: bash; title: ; notranslate\" title=\"\">\r\nbin\/flume-ng agent -c conf\/ -n a1 -f jar\/ToUpCase.conf -C jar\/vip.isok-1.0-SNAPSHOT.jar -Dflume.root.logger=DEBUG,console\r\n<\/pre>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n","protected":false},"excerpt":{"rendered":"<p>\u67e5\u8be2\u66ff\u6362\u62e6\u622a\u5668 &nbsp; search.conf #1 agent a1.sources = r1 a1. [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-1306","post","type-post","status-publish","format-standard","hentry","category-uncategorized"],"blocksy_meta":[],"_links":{"self":[{"href":"https:\/\/notes.coremix.net\/index.php?rest_route=\/wp\/v2\/posts\/1306","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/notes.coremix.net\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/notes.coremix.net\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/notes.coremix.net\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/notes.coremix.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=1306"}],"version-history":[{"count":7,"href":"https:\/\/notes.coremix.net\/index.php?rest_route=\/wp\/v2\/posts\/1306\/revisions"}],"predecessor-version":[{"id":1313,"href":"https:\/\/notes.coremix.net\/index.php?rest_route=\/wp\/v2\/posts\/1306\/revisions\/1313"}],"wp:attachment":[{"href":"https:\/\/notes.coremix.net\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=1306"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/notes.coremix.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=1306"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/notes.coremix.net\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=1306"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}