본문 바로가기
제품/ELK

logstash jdbc input 플러그인

by 헬로웬디 2025. 3. 27.

 

jdbc {
    clean_run => false
    jdbc_driver_library => "E:\logstash\mariadb-java-client.jar"
    jdbc_driver_class => "Java::org.mariadb.jdbc.Driver"
    jdbc_connection_string => "jdbc:mariadb://xxxxx:1234/xyz"
    jdbc_user => "your_username"
    jdbc_password => "your_password"

    # Track last processed date/time
    last_run_metadata_path => "E:/logstash/last_run_metadata.yml"
    tracking_column => "update_date"
    tracking_column_type => "timestamp"  # Make sure it's a timestamp field
    use_column_value => true
    statement => "SELECT * FROM users WHERE update_date > :sql_last_value"

    # Connection pool configuration: Validate before use
    jdbc_validate_connection => true
    jdbc_validation_query => "SELECT 1"
    jdbc_validation_timeout => 120

    # Retry logic to reconnect if needed
    connection_retry_attempts => 5
    connection_retry_attempts_wait_time => 2

    # Run query every 6 hours (at 12 AM, 6 AM, 12 PM, 6 PM)
    schedule => "0 */6 * * *"
}

Key Components:

  1. last_run_metadata_path:
    • This is the file where Logstash will store the last timestamp of the query.
    • It helps in tracking the last successful query execution time.
  2. tracking_column => "update_date":
    • Defines the column to track for incremental querying. In this case, it’s the update_date column.
  3. tracking_column_type => "timestamp":
    • Specifies that the update_date is a timestamp column, so Logstash can correctly compare it with the last value.
  4. statement => "SELECT * FROM users WHERE update_date > :sql_last_value":
    • The query fetches records where update_date is greater than the last_query value stored in the last_run_metadata_path file.
  5. use_column_value => true:
    • This tells Logstash to use the last timestamp stored in :sql_last_value to filter records.

How It Works:

  • First Run: Logstash will fetch all records where update_date is greater than the last run's timestamp (which doesn't exist yet).
  • Subsequent Runs: Logstash will only fetch records where the update_date is greater than the timestamp of the last successful run stored in last_run_metadata_path

Monitor last_run_metadata.yml:

  • Check this file to ensure that the last execution time is correctly recorded.
input {
	jdbc {
		clean_run => false
		jdbc_driver_library => "E:\logstash\ojdbc8.jar"
		jdbc_driver_class => "Java::oracle.jdbc.OracleDriver"
		jdbc_connection_string => "jdbc:oracle:thin:@xxxxx:1234/xyz"
		...
		#-> Connection pool configuration. Validate connection before use.
		**jdbc_validate_connection => true**
		#->How often to validate a connection (in seconds)
		jdbc_validation_timeout => 120
		#->**??** jdbc_validation_query => "SELECT 1 FROM DUAL" **??**
		#->Maximum number of times to try connecting to database
		connection_retry_attempts => 5
		#-> Number of seconds to sleep between connection attempts
		connection_retry_attempts_wait_time => 2		
				
                #https://github.com/jeremyevans/sequel/blob/master/doc/opening_databases.rdoc)#
		sequel_opts =>
		{
			login_timeout => "60"
			prefetch_rows => "1000"
			jdbc_properties => #=> **Are these valid for Oracle JDBC ??** -#
			{
				"defaultRowPrefetch" => "1000"
				"loginTimeout" => "60"
				"inactiveConnectionTimeout" => "120"
				"timeoutCheckInterval" => "120"
				"tcpKeepAlive" => "true"
				"oracle.net.READ_TIMEOUT" => "5000"
				"validationQuery" => "SELECT 1 FROM DUAL"
			}
		}
	...
	}
    
    output {
	elasticsearch {
		hosts => ["http://localhost:9200"]
		index => "projects"
		document_id => "%{project_id}"
		document_type => "_doc"
		doc_as_upsert => true
		http_compression => true
	}
	stdout {
		codec => rubydebug
	}
}

 

또는

 

input {
	jdbc {
        # MySQL DB jdbc connection string to our database, softwaredevelopercentral
        jdbc_connection_string => "jdbc:mysql://localhost:3306/softwaredevelopercentral?autoReconnect=true&useSSL=false"
        # The user we wish to execute our statement as
        jdbc_user => "root"
        # The user password
        jdbc_password => ""
        # The path to our downloaded jdbc driver
        jdbc_driver_library => "D:\Programs\MySQLJava\mysql-connector-java-6.0.6.jar"
        # The name of the driver class for MySQL DB
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        # our query
        schedule => "* * * * *"
        statement => "SELECT * FROM student WHERE studentid > :sql_last_value order by studentid"
        use_column_value => true
        tracking_column => "studentid"
    }
}

output {
    stdout { codec => json_lines }
    elasticsearch { 
       hosts => ["localhost:9200"]
       index => "students"
       document_type => "student"
       document_id => "%{studentid}"
}

'제품 > ELK' 카테고리의 다른 글

webhook 테스트  (0) 2025.04.27
syslog 분석 중  (0) 2025.04.02
Webhook 커넥터 테스트  (0) 2025.02.22
데이터 가져오기  (0) 2025.02.15
로그스태시로 데이터베이스에서 레코드 가져오기  (0) 2025.02.13