jdbc {
clean_run => false
jdbc_driver_library => "E:\logstash\mariadb-java-client.jar"
jdbc_driver_class => "Java::org.mariadb.jdbc.Driver"
jdbc_connection_string => "jdbc:mariadb://xxxxx:1234/xyz"
jdbc_user => "your_username"
jdbc_password => "your_password"
# Track last processed date/time
last_run_metadata_path => "E:/logstash/last_run_metadata.yml"
tracking_column => "update_date"
tracking_column_type => "timestamp" # Make sure it's a timestamp field
use_column_value => true
statement => "SELECT * FROM users WHERE update_date > :sql_last_value"
# Connection pool configuration: Validate before use
jdbc_validate_connection => true
jdbc_validation_query => "SELECT 1"
jdbc_validation_timeout => 120
# Retry logic to reconnect if needed
connection_retry_attempts => 5
connection_retry_attempts_wait_time => 2
# Run query every 6 hours (at 12 AM, 6 AM, 12 PM, 6 PM)
schedule => "0 */6 * * *"
}
Key Components:
- last_run_metadata_path:
- This is the file where Logstash will store the last timestamp of the query.
- It helps in tracking the last successful query execution time.
- tracking_column => "update_date":
- Defines the column to track for incremental querying. In this case, it’s the update_date column.
- tracking_column_type => "timestamp":
- Specifies that the update_date is a timestamp column, so Logstash can correctly compare it with the last value.
- statement => "SELECT * FROM users WHERE update_date > :sql_last_value":
- The query fetches records where update_date is greater than the last_query value stored in the last_run_metadata_path file.
- use_column_value => true:
- This tells Logstash to use the last timestamp stored in :sql_last_value to filter records.
How It Works:
- First Run: Logstash will fetch all records where update_date is greater than the last run's timestamp (which doesn't exist yet).
- Subsequent Runs: Logstash will only fetch records where the update_date is greater than the timestamp of the last successful run stored in last_run_metadata_path
Monitor last_run_metadata.yml:
- Check this file to ensure that the last execution time is correctly recorded.
input {
jdbc {
clean_run => false
jdbc_driver_library => "E:\logstash\ojdbc8.jar"
jdbc_driver_class => "Java::oracle.jdbc.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@xxxxx:1234/xyz"
...
#-> Connection pool configuration. Validate connection before use.
**jdbc_validate_connection => true**
#->How often to validate a connection (in seconds)
jdbc_validation_timeout => 120
#->**??** jdbc_validation_query => "SELECT 1 FROM DUAL" **??**
#->Maximum number of times to try connecting to database
connection_retry_attempts => 5
#-> Number of seconds to sleep between connection attempts
connection_retry_attempts_wait_time => 2
#https://github.com/jeremyevans/sequel/blob/master/doc/opening_databases.rdoc)#
sequel_opts =>
{
login_timeout => "60"
prefetch_rows => "1000"
jdbc_properties => #=> **Are these valid for Oracle JDBC ??** -#
{
"defaultRowPrefetch" => "1000"
"loginTimeout" => "60"
"inactiveConnectionTimeout" => "120"
"timeoutCheckInterval" => "120"
"tcpKeepAlive" => "true"
"oracle.net.READ_TIMEOUT" => "5000"
"validationQuery" => "SELECT 1 FROM DUAL"
}
}
...
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "projects"
document_id => "%{project_id}"
document_type => "_doc"
doc_as_upsert => true
http_compression => true
}
stdout {
codec => rubydebug
}
}
또는
input {
jdbc {
# MySQL DB jdbc connection string to our database, softwaredevelopercentral
jdbc_connection_string => "jdbc:mysql://localhost:3306/softwaredevelopercentral?autoReconnect=true&useSSL=false"
# The user we wish to execute our statement as
jdbc_user => "root"
# The user password
jdbc_password => ""
# The path to our downloaded jdbc driver
jdbc_driver_library => "D:\Programs\MySQLJava\mysql-connector-java-6.0.6.jar"
# The name of the driver class for MySQL DB
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# our query
schedule => "* * * * *"
statement => "SELECT * FROM student WHERE studentid > :sql_last_value order by studentid"
use_column_value => true
tracking_column => "studentid"
}
}
output {
stdout { codec => json_lines }
elasticsearch {
hosts => ["localhost:9200"]
index => "students"
document_type => "student"
document_id => "%{studentid}"
}'제품 > ELK' 카테고리의 다른 글
| webhook 테스트 (0) | 2025.04.27 |
|---|---|
| syslog 분석 중 (0) | 2025.04.02 |
| Webhook 커넥터 테스트 (0) | 2025.02.22 |
| 데이터 가져오기 (0) | 2025.02.15 |
| 로그스태시로 데이터베이스에서 레코드 가져오기 (0) | 2025.02.13 |