范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

笔记九Flink常用的sink方法

  1.1DataSink数据输出
  经过一系列Transformation转换操作后,最后一定要调用Sink操作,才会形成一个完整的DataFlow拓扑。只有调用了Sink操作,才会产生最终的计算结果,这些数据可以写入到的文件、输出到指定的网络端口、消息中间件、外部的文件系统或者是打印到控制台。
  1.1.1print 打印
  打印是最简单的一个Sink,通常是用来做实验和测试时使用。如果想让一个DataStream输出打印的结果,直接可以在该DataStream调用print方法。另外,该方法还有一个重载的方法,可以传入一个字符,指定一个Sink的标识名称,如果有多个打印的Sink,用来区分到底是哪一个Sink的输出。  import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.util.Collector;  public class PrintSinkDemo {      public static void main(String[] args) throws Exception {          //local模式默认的并行度是当前机器的逻辑核的数量         Configuration configuration = new Configuration();         StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);          int parallelism0 = env.getParallelism();          System.out.println("执行环境默认的并行度:" + parallelism0);          DataStreamSource lines = env.socketTextStream("cs-28-86", 8888);          //获取DataStream的并行度         int parallelism = lines.getParallelism();          System.out.println("SocketSource的并行度:" + parallelism);          lines.print();          //lines.addSink(new MyPrintSink()).name("my-print-sink");          env.execute();       }      public static class MyPrintSink extends RichSinkFunction {          private int indexOfThisSubtask;         @Override         public void open(Configuration parameters) throws Exception {             RuntimeContext runtimeContext = getRuntimeContext();             indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();         }          @Override         public void invoke(String value, Context context) throws Exception {              System.out.println(indexOfThisSubtask + 1 + "> " + value);         }     } }
  下面的结果是WordCount例子中调用print Sink输出在控制台的结果,细心的读者会发现,在输出的单词和次数之前,有一个数字前缀,我这里是1~4,这个数字是该Sink所在subtask的Index + 1。有的读者运行的结果数字前缀是1~8,该数字前缀其实是与任务的并行度相关的,由于该任务是以local模式运行,默认的并行度是所在机器可用的逻辑核数即线程数,我的电脑是2核4线程的,所以subtask的Index范围是0~3,将Index + 1,显示的数字前缀就是1~4了。这里在来仔细的观察一下运行的结果发现:相同的单词输出结果的数字前缀一定相同,即经过keyBy之后,相同的单词会被shuffle到同一个subtask中,并且在同一个subtask的同一个组内进行聚合。一个subtask中是可能有零到多个组的,如果是有多个组,每一个组是相互独立的,累加的结果不会相互干扰。
  1.1.2writerAsText 以文本格式输出
  该方法是将数据以文本格式实时的写入到指定的目录中,本质上使用的是TextOutputFormat格式写入的。每输出一个元素,在该内容后面同时追加一个换行符,最终以字符的形式写入到文件中,目录中的文件名称是该Sink所在subtask的Index + 1。该方法还有一个重载的方法,可以额外指定一个枚举类型的参数writeMode,默认是WriteMode.NO_OVERWRITE,如果指定相同输出目录下有相同的名称文件存在,就会出现异常。如果是WriteMode.OVERWRITE,会将以前的文件覆盖。  import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;  public class WriteSinkDemo {      public static void main(String[] args) throws Exception {          //local模式默认的并行度是当前机器的逻辑核的数量         Configuration configuration = new Configuration();         StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);          int parallelism0 = env.getParallelism();          System.out.println("执行环境默认的并行度:" + parallelism0);          DataStreamSource lines = env.socketTextStream("localhost", 8888);          //获取DataStream的并行度         int parallelism = lines.getParallelism();          System.out.println("SocketSource的并行度:" + parallelism);          lines.writeAsText("file:///Users/xing/Desktop/out");          env.execute();       }      public static class MyPrintSink extends RichSinkFunction {          private int indexOfThisSubtask;         @Override         public void open(Configuration parameters) throws Exception {             RuntimeContext runtimeContext = getRuntimeContext();             indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();         }          @Override         public void invoke(String value, Context context) throws Exception {              System.out.println(indexOfThisSubtask + 1 + "> " + value);         }     } }
  1.1.3writeAsCsv 以csv格式输出
  该方法是将数据以csv格式写入到指定的目录中,本质上使用的是CsvOutputFormat格式写入的。每输出一个元素,在该内容后面同时追加一个换行符,最终以csv的形式(类似Excel的格式,字段和字段之间用逗号分隔)写入到文件中,目录中的文件名称是该Sink所在subtask的Index + 1。需要说明的是,该Sink并不是将数据实时的写入到文件中,而是有一个BufferedOutputStream,默认缓存的大小为4096个字节,只有达到这个大小,才会flush到磁盘。另外程序在正常退出,调用Sink的close方法也会flush到磁盘。  DataStream> result = wordAndOne.keyBy(0).sum(1); result.writeAsCsv(path);
  1.1.4writeUsingOutputFormat以指定的格式输出
  该方法是将数据已指定的格式写入到指定目录中,该方法要传入一个OutputFormat接口的实现类,该接口有很多已经实现好了的实现类,并且可以根据需求自己实现,所以该方法更加灵活。writeAsText和writeAsCsv方法底层都是调用了writeUsingOutputFormat方法。  DataStream> result = wordAndOne.keyBy(0).sum(1); result.writeUsingOutputFormat(new TextOutputFormat<>(new Path(path));
  1.1.5writeToSocket输出到网络端口
  该方法是将数据输出到指定的Socket网络地址端口。该方法需要传入三个参数:第一个为ip地址或主机名,第二个为端口号,第三个为数据输出的序列化格式SerializationSchema。输出之前,指定的网络端口服务必须已经启动。  DataStreamSource lines = env.socketTextStream("localhost", 8888); lines.writeToSocket("localhost", 9999, new SimpleStringSchema());
  1.1.6RedisSink
  该方法是将数据输出到Redis数据库中,Redis是一个基于内存、性能极高的NoSQL数据库,数据还可以持久化到磁盘,读写速度快,适合存储key-value类型的数据。Redis不仅仅支持简单的key-value类型的数据,同时还提供list,set,zset,hash等数据结构的存储。Flink实时计算出的结果,需要快速的输出存储起来,要求写入的存储系统的速度要快,这个才不会造成数据积压。Redis就是一个非常不错的选择。
  首先在maven项目中的pom.xml中添加Redis Sink的依赖。        org.apache.bahir     flink-connector-redis_${scala.binary.version}     1.1-SNAPSHOT 
  接下来就是定义一个类(或者静态内部类)实现RedisMapper即可,需要指定一个泛型,这里是Tuple2,即写入到Redis中的数据的类型,并实现三个方法。第一个方法是getCommandDescription方法,返回RedisCommandDescription实例,在该构造方法中可以指定写入到Redis的方法类型为HSET,和Redis的additionalKey即value为HASH类型外面key的值;第二个方法getKeyFromData是指定value为HASH类型对应key的值;第三个方法geVauleFromData是指定value为HASH类型对应value的值。
  在使用之前,先new FlinkJedisPoolConfig,设置Redis的ip地址或主机名、端口号、密码等。然后new RedisSink将准备好的conf和RedisWordCountMapper实例传入到其构造方法中,最后调用DataStream的addSink方法,将new好的RedisSink作为参数传入。  import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; import org.apache.flink.util.Collector;  /**  * 从指定的socket读取数据,对单词进行计算,将结果写入到Redis中  */ public class RedisSinkDemo {      public static void main(String[] args) throws Exception {          //创建Flink流计算执行环境         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();          //创建DataStream         //Source         DataStreamSource lines = env.socketTextStream("cs-28-86", 8888);          //调用Transformation开始         //调用Transformation         SingleOutputStreamOperator> wordAndOne = lines.flatMap(new FlatMapFunction>() {             @Override             public void flatMap(String line, Collector> collector) throws Exception {                 String[] words = line.split(" ");                 for (String word : words) {                     //new Tuple2(word, 1)                     collector.collect(Tuple2.of(word, 1));                 }             }         });          //分组         KeyedStream, String> keyed = wordAndOne.keyBy(new KeySelector, String>() {             @Override             public String getKey(Tuple2 tp) throws Exception {                 return tp.f0;             }         });          //聚合         SingleOutputStreamOperator> summed = keyed.sum(1);          //Transformation结束          //调用Sink         //summed.addSink()         FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setDatabase(0).build();          summed.addSink(new RedisSink>(conf, new RedisWordCountMapper()));         //启动执行         env.execute("StreamingWordCount");      }      public static class RedisWordCountMapper implements RedisMapper> {          @Override         public RedisCommandDescription getCommandDescription() {             return new RedisCommandDescription(RedisCommand.HSET, "WORD_COUNT");         }          @Override         public String getKeyFromData(Tuple2 data) {             return data.f0;         }          @Override         public String getValueFromData(Tuple2 data) {             return data.f1.toString();         }     }  }
  1.1.7KafkaSink
  在实际的生产环境中,经常会有一些场景,需要将Flink处理后的数据快速地写入到一个分布式、高吞吐、高可用、可用保证Exactly Once的消息中间件中,供其他的应用消费处理后的数据。Kafka就是Flink最好的黄金搭档,Flink不但可以从Kafka中消费数据,还可以将处理后的数据写入到Kafka,并且吞吐量高、数据安全、可以保证Exactly Once等。
  Flink可以和Kafka多个版本整合,比如0.11.x、1.x、2.x等,从Flink1.9开始,使用的是kafka 2.2的客户端,所以这里使用kafka的版本是2.2.2,并且使用最新的API。
  下面的例子就是将数据写入到Kafka中,首先要定义一个类实现KafkaSerializationSchema接口,指定一个泛型,String代表要写入到Kafka的数据为String类型。该类的功能是指定写入到Kafka中数据的序列化Schema,需要重写serialize方法,将要写入的数据转成二进制数组,并封装到一个ProducerRecord中返回。  import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;  public class KafkaSinkDemo {      public static void main(String[] args) throws Exception {          //local模式默认的并行度是当前机器的逻辑核的数量         Configuration configuration = new Configuration();         StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);          int parallelism0 = env.getParallelism();          System.out.println("执行环境默认的并行度:" + parallelism0);          DataStreamSource lines = env.socketTextStream("cs-28-86", 8888);          //获取DataStream的并行度         int parallelism = lines.getParallelism();          System.out.println("SocketSource的并行度:" + parallelism);          //lines.writeAsText("file:///Users/xing/Desktop/out");          FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>(                 "cs-28-87:9092,cs-28-88:9092,cs-28-89:9092", "wordcount18", new SimpleStringSchema()         );          lines.addSink(kafkaProducer);          env.execute();      }  }
  启动nc –lk 8888 ,然后启动上述代码程序;
  在nc窗口中输入数据,使用kafka可以消费到;
  kafka消费wordcount18的topic:
  [root@cs-28-88 ~]# kafka-console-consumer --zookeeper cs-28-88:2181 --topic wordcount18
  然后将Kafka相关的参数设置到Properties中,再new FlinkKafkaProducer,将要写入的topic名称、Kafka序列化Schema、Properties和写入到Kafka的Semantic语义作为FlinkKafkaProducer构造方法参数传入。最好调用addSink方法将FlinkKafkaProducer的引用传入到该方法中。虽然下面的代码指定了EXACTLY_ONCE语义,但是没有开启Checkpointing,是没法实现的。具有怎样实现Exactly Once,会在后面原理深入的章节进行讲解。
  1.1.8StreamFileDataSink
  实时处理的数据,有一些场景要输出到其他分布式文件系统中,比如Hadoop HDFS、Amazon S3 (Simple Storage Service)、Aliyun OSS(Object Storage Service)等。因为这些分布式文件系统都具有高可用、可扩展、多副本、存储海量数据等特点。存储到分布式文件系统的数据,就可以做一些离线的数据分析,比如离线的数仓、数据挖掘、机器学习等。
  从Flink 1.9开始,原来的Bucketing Sink已经标记为过时,在未来的版本将会被移除。推荐使用StreamFileDataSink,该Sink不但可以将数据写入到各种文件系统中,可以保证Exacly Once语义,还支持以列式存储的格式写入,功能更强大。
  下面的例子是将数据写入到HDFS中,首先在maven项目的pom.xml文件引入HDFS文件系统的依赖:       org.apache.flink     flink-connector-filesystem_2.12     1.12-SNAPSHOT         org.apache.hadoop     hadoop-client     2.6.0 
  通过DefaultRollingPolicy这个工具类,指定文件滚动生成的策略。这里设置的文件滚动生成策略有两个,一个是距离上一次生成文件时间超过30秒,另一个是文件大小达到100 mb。这两个条件只要满足其中一个即可滚动生成文件。然后StreamingFileSink.forRowFormat方法将文件输出目录、文件写入的编码传入,再调用withRollingPolicy关联上面的文件滚动生成策略,接着调用build方法构建好StreamingFileSink,最后将其作为参数传入到addSink方法中。
  1.1.9 JDBCSink  package com.bigdata.sink;  import com.bigdata.utils.DateUtil; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;  import java.util.Arrays; import java.util.List;  /****  * @author songshiming  * @date 2022/12/10  * @desc  */   public class SinkToMySql2 {     public static void main(String[] args) throws Exception {          System.out.println("start="+ DateUtil.getCurrentdatetime());         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //        DataStreamSource lineDataStreamSource = env.readTextFile("input/200.csv"); DataStreamSource lineDataStreamSource = env.readTextFile("E://t1_trxrecord_20220821_V2.csv","gb2312");          SingleOutputStreamOperator> wordList =                 lineDataStreamSource.flatMap((String line, Collector> out) -> { //                    line = new String(line.getBytes("utf-8"),"gbk"); //                    System.out.println("line="+line);             String[] words = line.split(",");             out.collect(Arrays.asList(words));          }).returns(Types.LIST(Types.STRING));          String sqlStr ="INSERT INTO db_test.test_t1(TRXID, PARENT_TRXID, MERCHANT_NO, BRANCH_OFFICE, EXPAND_ORG, MAINTENANCE_ORG, STORE_CD, TERMINAL_NO, TRADE_TIME, SETTLEMENT_DATE, PRODUCT_NAME, TRADE_TP, TRADE_STA, TRADE_CARD_NO, TRADE_CARD_TP, ISSUER_CODE, TRADE_INIT_AMT, TRADE_AMT, BILLING_CYCLE, FEE_COLLECTION_STA, MER_FEE, SYSTEM_COST, BRAND_FEE, NET_PROFIT, MCC18, MCC42, ACCOUNT_ID, BUSINESS_TYPE, ORDER_NO, OTHER_MER_NO, OTHER_ACCOUNT_NO, SUBMIT_WAYS, TERMINAL_CODE, TERMINAL_BATCH, TERMINAL_TRACK_NO, TRADE_REFERENCE_NO, CHANNEL_NO, CHANNEL_MER_NO, TRADE_REMARKS, TRADE_ABSTRACT, TRADE_IP, CHANNEL_RET_CODE, SUBMIT_TIME, ERR_CODE, ERR_MSG, CHANNEL_TRADE_TP, INSTALLMENT_SUBSIDY_FEE, SUBSIDY_INFO, DCC_CURRENCY, DCC_AMT, DCC_EXCHANGE_RATE, PACKAGE_FEE, APP_ID, PACKAGE_ID)  " +                 "VALUES  " +                 "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" ;          wordList.addSink(JdbcSink.sink(                 sqlStr,                 ((statement, word) -> {                     for (int i = 0; i < word.size(); i++) {                         String str = word.get(i).trim();                         statement.setString(i+1, str );                     }                 }),                 new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()                         .withUrl("jdbc:mysql://192.168.0.11:3306/db_test?useUnicode=true&characterEncoding=utf8")                         .withDriverName("com.mysql.jdbc.Driver")                         .withUsername("root")                         .withPassword("123456")                         .build()         ));          env.execute();         System.out.println("start="+ DateUtil.getCurrentdatetime());     } }

考编的小镇做题家们考10次上岸,靠做题走向城市深燃(shenrancaijing)原创作者王敏邹帅宛其李秋涵编辑李秋涵最近,由于明星考编争议,小镇做题家再度成为网络热词。这原本出自出身于小城市或县城的学子用来自嘲的梗,现在根据自由市场最后4条大鱼安东尼宝刀未老施罗德依然想要8500万随着自由市场的逐渐恢复平静,当前各支球队的引援补强已经基本到位,但当前自由市场依然还有几位昔日的全明星无人问津,其中还有不乏状元和探花,下面就给大家盘点一下当前自由市场仅剩的的几条我暗恋异父异母的继兄他是我们学校的校草脾气很臭从不让叫他哥哥我暗恋异父异母的继兄。他是我们学校的校草,脾气很臭,从不让我叫他哥哥。大家都在猜,我和他到底是什么关系。六月十日,高考完的暑假。张越在酒吧,程栀在家里。张向群打电话回来问小栀,张越米体国米寻拉诺基亚替代者首选米伦,备选阿坎吉马里潘等人直播吧7月17日讯据米兰体育报报道,国米正积极寻找拉诺基亚的替代者,除了米伦科维奇,国米还在考虑其他的人选,比如阿坎吉塞内西奥梅拉吉奇萨维奇马里潘扎加杜迪亚卡比等人。报道称,国米仍用生命写出了南京大屠杀,36岁身亡的张纯如,让人尊敬文话多多14亿中国人不忍翻阅的一本书,是她写的14亿中国人无法忘记的一段历史伤痕,她用英文记录在南京大屠杀被遗忘的二战浩劫当中,这个女子,执笔为剑,慷慨激昂,令日本外交官落荒而逃她6投拿13分!小身材蕴藏大能量,出色表现回应质疑,值得杜锋信任9580,中国男篮大胜中国台北男篮,拿下一场关键性的胜利。2胜1负,中国男篮以小组第二的身份晋级淘汰赛,将会对阵东道主印度尼西亚。从比赛的过程来看,中国男篮后防线打出了竞争力,胡明科幻照进现实,中国空天航母到底有多强?何时才能服役?在许多的科幻电影当中,我们都可以看到许多具备先进科技的一些科技产品,在这当中也不乏有着航空母舰在太空遨游的画面,这一类航母正是被称之为空天航母。那如果真的能够实现科幻照进现实,我国演员崔龙海葬礼低调举行,子女透露其去世细节,六小龄童致电慰问7月15日,六小龄童发文透露恩师崔龙海于7月14日凌晨1点30分去世,享年95岁。据悉,崔龙海是国家一级演员,素有锡剧猴王一称,他与六小龄童的父亲六龄童及李万春并称三大猴王。崔龙海泪目!王曼昱1天4赛腿伤爆发睡觉都疼醒吃止疼药死磕下混双冠军7月17日消息,2022年WTT球星挑战赛匈牙利布达佩斯站混双决赛,中国组合王楚钦王曼昱以32逆转日本组合张本智和早田希娜夺冠,赛后采访时,王炸组合都在心疼对方的伤病,而王曼昱最终贵州最牛黑老大赵元良,独霸六盘水十余年,组织严密与警方对抗?号称贵州最牛的黑老大赵元良究竟有多嚣张呢?这是赵元良入狱前的一张照片,看着挺老实的一个人,却是贵州六盘水人民提起来都恨得牙痒痒的一个人!别看赵元良只有一米六,但是他却足足有160多杜兰特没人要的背后原因,四个角度详细解析为何这么难在太阳选择匹配步行者给艾顿的顶薪报价合同后,也宣告着他们正式退出杜兰特争夺战,因为他们已经没有能打动篮网的筹码了。而此前有报道,猛龙不愿意用未来核心巴恩斯去换34岁的杜兰特篮网向鹈
国际最新研究发现最大化石花距今近4000万年保存于琥珀中新网北京1月13日电(记者孙自法)施普林格自然旗下开放获取学术期刊科学报告最新发表一篇化石研究论文称,研究人员利用最新图像记录了一朵保存在琥珀中的已知最大化石花,其直径28毫米,全新微型电动车比亚迪海鸥申报图曝光预计起售价在8万左右日前,我们从工信部最新一期产品目录上,获取到比亚迪海鸥的申报图。新车基于e平台3。0打造,预计今年上半年正式发布。外观方面,比亚迪海鸥的外观设计非常灵动,封闭式前脸搭配锐利的大灯组如何让孩子主动爱上学习?学会这些方法,差生逆袭尖子生如何让孩子主动爱上学习?很多家长在督促和监督孩子学习的时候,会感觉到心累。因为有的孩子自律性比较低,家长一会没看住,他的注意力去不在学习上了。因此,如何能让孩子主动学习成了家长的头40眼袋时有时无是怎么回事?到底用什么方法祛眼袋效果好?有很多40的上班族的求美者和我说,自己30岁时眼袋时有时无的,但近几年眼袋却长期存在无法消退了,这是怎么回事?如果你也正在被眼袋问题所困扰,找不到解决办法,那一定要看完这篇文章哦。葱白这样吃,补元气润肠胃葱是厨房常用的一味调料,不管是炒菜煮粥煲汤,加点葱烹饪,味道更香浓。有时候一个葱花就可以撑起一个菜,比如葱爆羊肉。葱的葱白部分也有很好的养生效果,天气寒冷的时候,弄个几根葱白两片生脾虚,可能是因为心火不旺,送你一个中药方,补益心气,改善脾虚脾虚可以说是现代人最普遍的一个现象了,现在的人十个中有九个都脾虚,但是脾虚的原因却各不相同。很多人脾胃不好就去用一些补脾的药,用了很久都没有效果,这是因为中医把人看做一个整体,五脏天然的嘌呤大王是海鲜的5倍,这1素菜早扔掉,你还在吃吗嘌呤是一种有机化合物,分子式CHN,在人体内嘌呤氧化而变成尿酸。人体尿酸过高就会引起痛风,俗称富贵病。一般在男性身上发病,有遗传概率。嘌呤的全名为嘌呤核苷酸,在人体代谢过程中扮演着名誉太看重了不好有些人,他虽说不把钱看在眼里。但他把自己的名誉,看得非常重要。就是普通人,有的也不怕别人说他无能,不怕别人说他穷。但,他怕别人说他的心里不干净。一生做事谨慎,他从不想占任何人的便宜常喝咖啡,到底好不好?医生这5类人最好不要碰,希望你能听劝咖啡是全世界最受欢迎的饮料之一,它正融入中国城市居民的生活。热闹的街区通常都会有几家咖啡店,门缝中偶尔飘出来的香气,让匆匆的行人慢下了脚步,甚至走进去品尝一杯。除了消遣,咖啡也是很阳康后,3种食物多吃2件小事别做!否则易复阳,身体越来越虚,年都过不好最近,阳康越来越多了,地铁商场又恢复到了人挤人的状态。在经历了刀片喉宝娟嗓水泥鼻等一系列症状之后,大家也逐渐恢复过来了。但问题也来了,怎么做才能让身体更快恢复呢?阳康后多吃3种食物研究实现化学反应的立体动力学精准调控化学反应无处不在。如何精确调控化学反应是化学科学研究的核心目标之一。在化工生产过程中,工程师通过添加催化剂改变化学过程的温度与压力等宏观参数,可以在一定程度上控制化学反应,得到所需