范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

Kafka系统与ELK的整合(八)

  我们使用Apache Flume来采集数据到Kafka中进行存储,最后在ELK中展示出来。到http://flume.apache.org/的地址下载Apache Flume,下载后部署在日志的服务器。下载后进行解压以及配置到环境变量中。整体思路是在拉勾网搜索"测试开发工程师",把获取到的结果信息存储到Kafka的系统中,最后展示在ELK中。下面具体配置这些信息。在conf的目录下编辑文件,文件内容为:#设置代理名agent.sources=s1 agent.channels=c1 agent.sinks=k1  #设置收集方式agent.sources.s1.type=exec agent.sources.s1.command=tail -F  /Applications/devOps/bigData/ELK/apache-flume/logs/apps.log agant.sources.s1.channels=c1 agent.channels.c1.type=memory agent.channels.c1.capacity=10000apage.channels.c1.transactionCapacity=100#设置kafka接收器agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink #设置kafka的broker和端口号agent.sinks.k1.brokerList=localhost:9092#设置kafka的topicagent.sinks.k1.topic=laGou #设置序列化agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder #指定管道名agent.sinks.k1.channel=c1
  这里使用的主题是laGou,切记此时需要启动Kafka。下来启动Apache Flume,在apache-flume/bin的执行如下命令来启动,命令为:flume-ng agent -n agent --conf conf --conf-file ../conf/flume-kafka.properties  -Dflume.root.logger=DEBUG,CONSOLE
  执行后,输出如下的信息:
  下来使用分流数据的方式来实现数据的展示,具体可以理解为把采集到的数据存储到Kafka系统中,然后使用LogStash来消费Kafka存储的数据,并将消费后的数据存储到ElasticSearch中。下来配置logstash.yml的文件,配置LogStash账户和密码,具体如下:
  配置kafka_laGou.conf,具体内容为:
  配置完成后,在控制台中LogStach来消费Kafka集群中主题为laGou的数据,到LogStash的bin目录下执行:./logstash -f ../config/kafka_laGou.conf
  执行后,LogStash的Agent将正常启动并消费Kafka集群中的数据,然后把消费后的数据存储到ElasticSearch集群中,执行后,输出如下信息:Sending Logstash"s logs to /Applications/devOps/bigData/ELK/logstash/logs which is now configured via log4j2.properties[2021-06-12T18:39:43,175][WARN ][logstash.config.source.multilocal] Ignoring the "pipelines.yml" file because modules or command line options are specified [2021-06-12T18:39:43,210][FATAL][logstash.runner          ] Logstash could not be started because there is already another instance using the configured data directory.  If you wish to run multiple instances, you must change the "path.data" setting. [2021-06-12T18:39:43,221][ERROR][org.logstash.Logstash    ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exitlocalhost:bin liwangping$ clear localhost:bin liwangping$ ./logstash -f ../config/kafka_laGou.conf Sending Logstash"s logs to /Applications/devOps/bigData/ELK/logstash/logs which is now configured via log4j2.properties[2021-06-12T18:40:31,712][WARN ][logstash.config.source.multilocal] Ignoring the "pipelines.yml" file because modules or command line options are specified [2021-06-12T18:40:32,136][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.3.2"} [2021-06-12T18:40:33,674][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50} [2021-06-12T18:40:34,092][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://elastic:xxxxxx@localhost:9200/]}} [2021-06-12T18:40:34,111][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://elastic:xxxxxx@localhost:9200/, :path=>"/"} [2021-06-12T18:40:34,426][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://elastic:xxxxxx@localhost:9200/"} [2021-06-12T18:40:34,505][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6} [2021-06-12T18:40:34,508][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won"t be used to determine the document _type {:es_version=>6}[2021-06-12T18:40:34,528][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]} [2021-06-12T18:40:34,544][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil} [2021-06-12T18:40:34,561][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}} [2021-06-12T18:40:34,584][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#"} [2021-06-12T18:40:34,670][INFO ][logstash.outputs.elasticsearch] Installing elasticsearch template to _template/logstash [2021-06-12T18:40:34,676][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} [2021-06-12T18:40:34,691][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values: auto.commit.interval.ms = 5000auto.offset.reset = latest bootstrap.servers = [localhost:9092] check.crcs = true client.id = logstash-0connections.max.idle.ms = 540000enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800fetch.max.wait.ms = 500fetch.min.bytes = 1group.id = console-consumer-83756heartbeat.interval.ms = 3000interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576max.poll.interval.ms = 300000max.poll.records = 500metadata.max.age.ms = 300000metric.reporters = [] metrics.num.samples = 2metrics.recording.level = INFO metrics.sample.window.ms = 30000partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 305000retry.backoff.ms = 100sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072session.timeout.ms = 10000ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer  [2021-06-12T18:40:34,797][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 1.1.0[2021-06-12T18:40:34,798][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : fdcf75ea326b8e07 [2021-06-12T18:40:35,011][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: E0qvXyu_T_Wr_vZgZUV80w [2021-06-12T18:40:35,024][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null) [2021-06-12T18:40:35,029][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Revoking previously assigned partitions [] [2021-06-12T18:40:35,029][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] (Re-)joining group [2021-06-12T18:40:35,047][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600} [2021-06-12T18:40:35,149][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Successfully joined group with generation 1[2021-06-12T18:40:35,151][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Setting newly assigned partitions [laGou-0, laGou-1, laGou-2, laGou-3, laGou-4, laGou-5] [2021-06-12T18:40:35,168][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-0 to offset 1.[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-1 to offset 1.[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-2 to offset 1.[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-3 to offset 1.[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-4 to offset 1.[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-5 to offset 0.
  此时,在Kafka的监控系统中可以看到主题laGou消费的详细信息,如下所示:
  下来实现数据的可视化,把数据存储到ElasticSearch的集群后,就可以通过Kibana来查询和分析数据。在ManageMent里面创建索引后,点击Discover模块,然后就会展示消费到的拉勾网的测试开发职位的数据,如下所示:
  可以使用不同的索引来查询,比如使用message来查询,就会显示如下的信息:
  当然也可以点击查看完整的数据,点击向右的箭头,就可以使用table格式和JSON格式来展示具体的数据。
  感谢您的阅读和关注,后续会持续更新!

盛世美颜无需精修,vivoS10Pro助你get海报级自拍在自拍手机领域,vivo无疑是最具认可度与知名度的品牌之一。旗下的S系列自拍旗舰,是许多爱美小姐姐的首选自拍利器。在本月中旬,vivo则推出了新款自拍旗舰vivoS10系列,再度为荣耀30S更多信息曝光,将无缘UFS3。1LPDDR5提起奥利奥,相信大家第一时间想到的应该是饼干,而小编就不一样了,第一个想到的即将要发布的手机荣耀30S。为什么小编会想到这款手机呢?主要是因为这款手机被曝光出来的后置摄像头采用的是AGMG1系列手机来了,能扛40低温,能耐60高温,售价3699元起欧界报道曾经被称之为地表最强三防手机的品牌AGM,在10月28日推出了全新的G系列旗舰机型AGMG1和AGMG1Pro。和之前的X系列有所不同,G系列的性能更加实用和硬核。当然了,啊哈!算法一本能看懂的算法书算法在一般人眼中都是很难而且枯燥的形象,这本啊哈!算法用通俗易懂的语言,配合有趣的插图让你在阅读本书的时候更像是在品读一篇篇轻松的短篇小说或是在玩一把题味解谜游戏,在轻松愉悦中掌握白送都没人的两款华为手机!最后一款真的是绝了今天小编突发奇想,想看看华为3000到4000块价位段的手机是什么样的,评分最高和最低的两款手机到底有啥区别。你别说,嘿,就让我给找到了。左边的这张是华为P30,2019年4月上市云徙数盈COO苗宇丨数盈品牌的诞生与运营服务演讲人苗宇编辑小徙在我们基于大量头部灯塔企业的创新实践中,在赋能成长型企业的数字营销的旅程中,我们探索出另一条新的产品矩阵,和一套新的服务模式。对于企业而言,面对新生命的诞生,我们618的电梯广告可太魔性了,原来电梯广告还能这样随着一年一度的618购物节的来临,如今各个电商以及广告公司都在大力宣传有关618购物节的相关信息。其中最让我印象深刻的莫过于电梯里的当贝宣传广告了,不管是去公司上班去商场购物还是回罗兰贝格发布全行业趋势报告,超级产品公司将称霸202116大行业2021年发展趋势预测。报告来源罗兰贝格2021年元旦后这几天,不少群都在转发这份报告这份报告是国际知名管理咨询公司罗兰贝格(RolandBerger)在12月31日发布Windows7和8。1系统免费升级到Windows10据悉,微软让客户将系统升级到Windows10的策略之一是免费提供。微软表示,限于发行的第一年,客户可以免费将运行Windows7或Windows8。1正版版本的系统免费升级到Wi一文解读手机操作系统的演变史199920212020年随着华为鸿蒙系统2。0的陆续上线,我们的手机操作系统迎来安卓系统苹果系统三国鼎立的市场格局,若让我点评华为鸿蒙2。0系统的话,我更喜欢把她比作如三国中的吴国进可以战,退可云徙创始人包志刚2020新年致辞洞见未来比拼速度徙讯企业级服务,最终决胜的因素在于快,我们的市场营销要快,我们的解决方案要快,产品技术的迭代要快,交付效率要快。谁能更快,才能致胜于未来!云徙科技创始人CEO包志刚在洞见未来比拼速度的
Vlog创作者的摄影包里装些什么?今天想要和大家来分享一下,一个摄影爱好者,或者说业余摄影师的包里都装了些什么?主要分为以下几个方面,分别是摄影包的介绍相机镜头辅助配件等几个方面。那么话不多说,就让我们开始吧!摄影手机NFC是什么?怎么使用?NFC基本上已经成为了安卓手机的标配,几乎人人的手机上都有NFC,但是很少有人知道,也很少有人去使用这个功能,其实NFC的功能非常的强大,不会使用的话就太可惜了。那接下来就给大家分说一下用512GB手机的感受?韩版s108512!说实话我拍照不多,游戏不玩,下载音乐也不多,视频都在线看,原来被32G和64G的苹果恶心到了,整天清微信!果断买了个大的,用了三个月了,后台重来不关,现在12G夜跑的最佳拍档,这个夏天要和HAKIIACTION一起度过了嘿!不知不觉天气又到了最热的时候,操场上夜跑的人变得多了起来。枯燥的跑步怎么能没有音乐的陪伴呢,虽说以前的耳机佩戴舒适度已经很不错了,但是在跑动过程中还是比较担心耳机跌落,心里总会搞事108MP骁龙870潜望长焦这新机有OPPOrealme内味了今年1月,在海外市场有着不错份额的Motorola摩托罗拉算是正式宣告旗舰系列回国,发布了MotorolaedgeS,全球首发高通骁龙870处理器,并把起售价拉到了1999元,一个为什么第三次人工智能浪潮需要可信AI?2021世界人工智能大会进行至第二天,华为京东蚂蚁等企业,中国信通院中科院中英人工智能伦理与治理研究中心等研究结构清华复旦交大浙大等高校,共同发出了一份倡议促进可信人工智能发展。对光纤入户后,光猫怎么放?现在的新小区都已是光纤入户,而光纤入户后都会放进开发商预留的多媒体箱内,而多媒体箱都会在进门口位置,特别是有玄关的家庭来说,都会考虑全屋WIFI网络信号好不好的问题,那问题来了光纤2021年有哪些画质出众的入门级电视?我把票投给这三款不知不觉,2021年已经过去一半。回顾上半年的智能市场,可以发现智能电视很热闹。除了索尼TCLLG等传统电视品牌外,小米OPPO华为等新兴互联网品牌也给我们带来了多款产品力强大的电BeatsStudioBuds对比AirPodsPro都有哪些差异?苹果公司在6月推出了新的BeatsStudioBuds,这是一款售价149美元的降噪耳塞,比AirPods和AirPodsPro更小且无茎。AirPodsPro和BeatsStud三星S22Ultra曝光屏下摄像头1亿像素骁龙895,机皇坐实了三星在中国市场销量很差,但是在全球市场销量仍然高居第一,原本国产机华为有机会在短时间内超越三星,然而因为遭到供应链断货,现在荣耀也脱离了华为,中国第一都拿不到了,更别说跟三星竞争全显卡暴跌,黄牛哭惨一台4K光追游戏电脑只要2万啦币崩啦,矿塌啦,黄牛们,爆仓啦!大概近期DIY圈子里都是这样的喜讯,不可一世的显卡纷纷降价,60回归2000价位,60Ti直逼3000大关,而3080Ti也回归到8999(丐)12