• 售前

  • 售后

热门帖子
入门百科

PostgreSQL 数据同步到ES 搭建操纵

[复制链接]
东边是黄海 显示全部楼层 发表于 2021-10-26 13:54:33 |阅读模式 打印 上一主题 下一主题
安装python 和dev 开发包
  1. [root@rtm2 Packages]# rpm -ivh python-devel-2.7.5-58.el7.x86_64.rpm
  2. 准备中...       ################################# [100%]
  3. 正在升级/安装...
  4. 1:python-devel-2.7.5-58.el7  ################################# [100%]
  5. [root@rtm2 Packages]# ls
复制代码
安装 multicorn
  1. [root@rtm2 multicorn-1.3.5]# make
  2. Python version is 2.7
  3. gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/errors.o src/errors.c
  4. gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/python.o src/python.c
  5. gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/query.o src/query.c
  6. gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/multicorn.o src/multicorn.c
  7. gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -shared -o multicorn.so src/errors.o src/python.o src/query.o src/multicorn.o -L/opt/pgsql-10/lib -Wl,--as-needed -Wl,-rpath,'/opt/pgsql-10/lib',--enable-new-dtags -lpthread -ldl -lutil -lm -lpython2.7 -lpthread -ldl -lutil -lm -lpython2.7 -Xlinker -export-dynamic
  8. .//preflight-check.sh
  9. cp sql/multicorn.sql sql/multicorn--1.3.5.sql
  10. [root@rtm2 multicorn-1.3.5]# make install
  11. Python version is 2.7
  12. ...
复制代码
安装pg-es-fdw-master
  1. [root@rtm2 multicorn-1.3.5]# cd ../pg-es-fdw-master
  2. [root@rtm2 pg-es-fdw-master]# ls
  3. demo.sh dite LICENSE README.md setup.py
  4. [root@rtm2 pg-es-fdw-master]# python setup.py build
  5. running build
  6. running build_py
  7. creating build
  8. creating build/lib
  9. creating build/lib/dite
  10. copying dite/__init__.py -> build/lib/dite
  11. [root@rtm2 pg-es-fdw-master]# python setup.py install
  12. running install
  13. running bdist_egg
  14. running egg_info
  15. creating dite.egg-info
  16. writing dite.egg-info/PKG-INFO
复制代码
安装插件 multicorn
  1. [postgres@rtm2 ~]$ psql
  2. psql (10.3)
  3. Type "help" for help.
  4. postgres=# select * from pg_extension;
  5. extname | extowner | extnamespace | extrelocatable | extversion | extconfig | extcondition
  6. ---------+----------+--------------+----------------+------------+-----------+--------------
  7. plpgsql |  10 |   11 | f    | 1.0  |   |
  8. (1 row)
  9. postgres=# CREATE EXTENSION multicorn;
  10. CREATE EXTENSION
  11. postgres=# psql
  12. postgres=# select * from pg_extension;
  13. extname | extowner | extnamespace | extrelocatable | extversion | extconfig | extcondition
  14. -----------+----------+--------------+----------------+------------+-----------+--------------
  15. plpgsql |  10 |   11 | f    | 1.0  |   |
  16. multicorn |  10 |   2200 | t    | 1.3.5  |   |
  17. (2 rows)
  18. postgres=# CREATE SERVER multicorn_es FOREIGN DATA WRAPPER multicorn OPTIONS(wrapper 'dite.ElasticsearchFDW');
  19. CREATE SERVER
  20. postgres=#
复制代码
es
  1. [root@rtm2 config]# vi elasticsearch.yml
  2. node.name: "es-node1"
  3. network.host: 192.168.31.121
  4. discovery.zen.ping.unicast.hosts: ["192.168.31.121"]
复制代码
  1. [root@rtm2 config]# vi /etc/sysctl.conf
  2. vm.max_map_count=262144
  3. sysctl -p
  4. [root@rtm2 config]# vi /etc/security/limits.conf
  5. # End of file
  6. root soft nofile 65536
  7. root hard nofile 65536
  8. root soft nproc 4096
  9. root hard nproc 4096
  10. ~
复制代码
启动es
  1. [root@rtm2 bin]# ls
  2. elasticsearch  elasticsearch.in.bat elasticsearch-service-mgr.exe elasticsearch-service-x86.exe plugin.bat
  3. elasticsearch.bat elasticsearch.in.sh elasticsearch-service-x64.exe plugin       service.bat
  4. [root@rtm2 bin]# ./bin/elasticsearch
复制代码
  1. test=# CREATE FOREIGN TABLE pp_es (id bigint,age bigint) SERVER multicorn_es OPTIONS (host
  2. test(# '192.168.31.121', port '9200', node 'es-node1', index 'pp');
  3. CREATE FOREIGN TABLE
  4. test=#
复制代码
创建触发器和外部表
  1. test=# CREATE OR REPLACE FUNCTION index_pp() RETURNS trigger AS $def$
  2. test$# BEGIN
  3. test$# INSERT INTO pp_es (id, age) VALUES
  4. test$# (NEW.id, NEW.age);
  5. test$# RETURN NEW;
  6. test$# END;
  7. test$# $def$ LANGUAGE plpgsql;
  8. CREATE FUNCTION
  9. test=# CREATE TRIGGER es_insert_pp AFTER INSERT ON pp FOR EACH ROW EXECUTE PROCEDURE index_pp();
  10. CREATE TRIGGER
  11. test=#
复制代码
新增数据测试
  1. test=# insert into pp (id,age) values (1,11);
  2. INSERT 0 1
  3. test=# select * from pp;
  4. id | age
  5. ----+-----
  6. 1 | 11
  7. (1 row)
  8. test=#
复制代码
查抄es数据
  1. [root@rtm2 ~]# curl 'http://192.168.31.121:9200/es-node1/_search?q=*:*&pretty'
  2. {
  3. "took" : 104,
  4. "timed_out" : false,
  5. "_shards" : {
  6. "total" : 5,
  7. "successful" : 5,
  8. "failed" : 0
  9. },
  10. "hits" : {
  11. "total" : 2,
  12. "max_score" : 1.0,
  13. "hits" : [ {
  14.   "_index" : "es-node1",
  15.   "_type" : "pp",
  16.   "_id" : "1",
  17.   "_score" : 1.0,
  18.   "_source":{"age": "11"}
  19. }, {
  20.   "_index" : "es-node1",
  21.   "_type" : "pp",
  22.   "_id" : "2",
  23.   "_score" : 1.0,
  24.   "_source":{"age": "22"}
  25. } ]
  26. }
  27. }
  28. [root@rtm2 ~]#
复制代码
创建更新触发器
  1. test=# CREATE OR REPLACE FUNCTION updadeIndex_pp() RETURNS trigger AS $def$
  2. BEGIN
  3. UPDATE pp_es SET
  4. id = NEW.id,
  5. age = NEW.age
  6. where id =NEW.id;
  7. RETURN NEW;
  8. END;
  9. $def$ LANGUAGE plpgsql;
  10. CREATE FUNCTION
  11. test=# ^C
  12. test=#
  13. test=# CREATE TRIGGER es_update_pp AFTER UPDATE OF id, age ON pp FOR EACH ROW WHEN (OLD.* IS DISTINCT
  14. test(# FROM NEW.*)EXECUTE PROCEDURE updadeIndex_pp();
  15. CREATE TRIGGER
  16. test=#
复制代码
更新表数据
  1. test=# select * from pp;
  2. id | age
  3. ----+-----
  4. 1 | 11
  5. 2 | 22
  6. 3 | 22
  7. (3 rows)
  8. test=# update pp a set a.age = 33 where a.id = 3;
  9. ERROR: column "a" of relation "pp" does not exist
  10. LINE 1: update pp a set a.age = 33 where a.id = 3;
  11.       ^
  12. test=# update pp set age = 33 where id = 3;
  13. UPDATE 1
  14. test=# select * from pp;
  15. id | age
  16. ----+-----
  17. 1 | 11
  18. 2 | 22
  19. 3 | 33
  20. (3 rows)
  21. test=#
复制代码
es查询变动
  1. [root@rtm2 ~]# curl 'http://192.168.31.121:9200/es-node1/_search?q=*:*&pretty'
  2. {
  3. "took" : 4,
  4. "timed_out" : false,
  5. "_shards" : {
  6. "total" : 5,
  7. "successful" : 5,
  8. "failed" : 0
  9. },
  10. "hits" : {
  11. "total" : 3,
  12. "max_score" : 1.0,
  13. "hits" : [ {
  14.   "_index" : "es-node1",
  15.   "_type" : "pp",
  16.   "_id" : "1",
  17.   "_score" : 1.0,
  18.   "_source":{"age": "11"}
  19. }, {
  20.   "_index" : "es-node1",
  21.   "_type" : "pp",
  22.   "_id" : "2",
  23.   "_score" : 1.0,
  24.   "_source":{"age": "22"}
  25. }, {
  26.   "_index" : "es-node1",
  27.   "_type" : "pp",
  28.   "_id" : "3",
  29.   "_score" : 1.0,
  30.   "_source":{"age": "33"}
  31. } ]
  32. }
  33. }
  34. [root@rtm2 ~]#
复制代码
补充:logstash同步pgsql数据到Elasticsearch
一、对于logstash的配置我就不在多说,紧张是三部门,input、filter、output的配置
二、配置步调
1、input配置
  1. input {
  2. stdin {
  3. }
  4. jdbc {
  5.   jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/world"
  6.   jdbc_user => "postgres"
  7.   jdbc_password => "zhang123"
  8.   jdbc_driver_library => "D:\logstash-6.4.0\bin\pgsql\postgresql-42.2.5.jar"
  9.   jdbc_driver_class => "org.postgresql.Driver"
  10.   jdbc_paging_enabled => "true"
  11.   jdbc_page_size => "300000"
  12.   use_column_value => "true"
  13.   tracking_column => "id"
  14.   statement_filepath => "D:\logstash-6.4.0\bin\pgsql\jdbc.sql"
  15. schedule => "* * * * *"
  16. type => "jdbc"
  17. jdbc_default_timezone =>"Asia/Shanghai"
  18. }
  19. }
复制代码
2、filter配置
  1. filter {
  2. json {
  3.   source => "message"
  4.   remove_field => ["message"]
  5. }
  6. }
复制代码
3、output 配置,就是elasticsearch的根本配置
  1. output {
  2. elasticsearch {
  3.   hosts => ["localhost:9200"]
  4.   index => "test_out"
  5. template => "D:\logstash-6.4.0\bin\pgsql\es-template.json"
  6. template_name => "t-statistic-out-logstash"
  7. template_overwrite => true
  8. document_type => "out"
  9.   document_id => "%{id}"
  10. }
  11. stdout {
  12.   codec => json_lines
  13. }
  14. }
复制代码
以上就是整个logstash 的jdbc.conf
4、es-template.json的配置
  1. {
  2. "template" : "t-statistis-out-template",
  3. "order":1,
  4. "settings": {
  5.    "index": {
  6.     "refresh_interval": "5s"
  7.    }
  8.   },
  9. "mappings": {
  10.    "_default_": {
  11. "_all" : {"enabled":false},
  12.     "dynamic_templates": [
  13.      {
  14.     "message_field" : {
  15.     "match" : "message",
  16.     "match_mapping_type" : "string",
  17.     "mapping" : { "type" : "string", "index" : "not_analyzed" }
  18.     }
  19.    }, {
  20.     "string_fields" : {
  21.     "match" : "*",
  22.     "match_mapping_type" : "string",
  23.     "mapping" : { "type" : "string", "index" : "not_analyzed" }
  24.     }
  25.    }
  26.     ],
  27.     "properties": {
  28.      "@timestamp": {
  29.       "type": "date"
  30.      },
  31.      "@version": {
  32.       "type": "keyword"
  33.      },     
  34.   "id": {
  35.       "type": "keyword"
  36.      },
  37.   "name": {
  38.       "type": "keyword"
  39.      },
  40.   "pp": {
  41.       "type": "keyword"
  42.      }  
  43.     }
  44.    }
  45.   },
  46.   "aliases": {}
  47. }
复制代码
最后就是就是下载好pgsql的连接驱动,这个官网可以下载;配置好本身的数据库表格的数据
启动下令:进入到logstash的bin目次下,本身的logstash配置都是放在bin的pgsql这个目次下面(这个本身随意创建位置都可以)
  1. logstash.bat -f ./pgsql/jdbc.conf
复制代码
以上为个人履历,盼望能给各人一个参考,也盼望各人多多支持草根技术分享。如有错误或未考虑完全的地方,望不吝见教。

帖子地址: 

回复

使用道具 举报

分享
推广
火星云矿 | 预约S19Pro,享500抵1000!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

草根技术分享(草根吧)是全球知名中文IT技术交流平台,创建于2021年,包含原创博客、精品问答、职业培训、技术社区、资源下载等产品服务,提供原创、优质、完整内容的专业IT技术开发社区。
  • 官方手机版

  • 微信公众号

  • 商务合作