-
Notifications
You must be signed in to change notification settings - Fork 155
/
Copy pathmetric_config_sample.yaml
181 lines (180 loc) · 7.23 KB
/
metric_config_sample.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# ---- SAMPLE YAML OF METRIC CONFIG FILE ---- #
steps:
# What will be the name of the table that represents the output of this step
- dataFrameName: table1
# Inline SQL
sql:
SELECT * FROM source
ignoreOnFailures: true
- dataFrameName: table2
# Use SQL from an external file
file: test.sql
- dataFrameName: table3
# Run custom code from the below object
classpath: com.example.CustomCode
# Send the params below to the custom code
params:
param1: value
param2: value
output:
# What table should we send to the output from the above steps
- dataFrameName: table1
# Defines the output type
outputType: File
# When using outputType: File we can set any custom format we like that's supported by spark
format: parquet
# Optional when using named outputs
name: fileDir1
# Optional repartition the output before writing it
repartition: 100
# Create a single partition from the requested table
coalesce: true
# Specifies options specific to each writer
outputOptions:
# defines the same mode (append/overwrite/ignore)
saveMode: Overwrite
# path of the file (can be omitted if using tableName and writing to warehouse)
path: file.parquet
# Optional: If enabled, adding current timestamp suffix to path of the file
createUniquePath: true
# Optional: Abort writing an empty df to output OR updating external table in Hive with empty parquets.
protectFromEmptyOutput: true
# partition by the following columns
partitionBy:
- col1
- col2
# when in streaming mode, micro batch each X time
triggerDuration: 10 seconds
# save output to hive metastore (or any other catalog provider)
tableName: hive_table1
# when a table already exists in the catalog always update its schema with the dataframe's schema (default is true)
alwaysUpdateSchemaInCatalog: true
# you can add below any option supported by file outputs (take a look at Spark's documentation)
extraOptions:
multiLine: true
- dataFrameName: table1
# All options in file output are available for Parquet as well
outputType: Parquet
- dataFrameName: table1
# All options in file output are available for CSV as well
outputType: CSV
- dataFrameName: table1
# All options in file output are available for JSON as well
outputType: JSON
- dataFrameName: table1
outputType: Cassandra
outputOptions:
saveMode: Overwrite
dbKeySpace: keyspace
dbTable: table
- dataFrameName: table1
outputType: Instrumentation
outputOptions:
# This will be used to specify a column that will be treated as a string and be used as a value.
# by default valuecolumn will be the last column of the schema.
valueColumn: col1
# When using influx instrumentation, use the time column in the table to set the time (by default it's now)
timeColumn: col3
- dataFrameName: table1
outputType: JDBC
outputOptions:
saveMode: Overwrite
dbTable: table_in_db
# Optional: run this after writing to JDBC
postQuery: CREATE INDEX...
- dataFrameName: table1
outputType: JDBCQuery
outputOptions:
# Run this query for each line in the table
query:
INSERT INTO table
(col1, col2)
VALUES (?, ?)
ON DUPLICATE KEY UPDATE value = VALUES(value)
# Size of the insert batch before committing
maxBatchSize: 10
# This will repartition the data if it doesn't have enough partitions
minPartitions: 5
# This will repartition the data if it has too many partitions
maxPartitions: 10
- dataFrameName: table1
outputType: Kafka
outputOptions:
# Name of the topic in kafka
topic: topic
# Column in table used as the key in kafka
keyColumn: col1
# This column will be sent as the payload of the kafka message
valueColumn: col2
# In streaming mode set to append/update/complete default is append
outputMode: append
# In streaming mode set the trigger type (Once/Processing time)
triggerType: Once
# In streaming mode if trigger is processing time or was not set, how long between micro batches
triggerDuration: 30 seconds
- dataFrameName: table1
outputType: Redis
outputOptions:
keyColumn: col1
- dataFrameName: table1
outputType: Redshift
outputOptions:
saveMode: Overwrite
dbTable: table
# Specify any copy options if required
extraCopyOptions: TRUNCATECOLUMNS ACCEPTINVCHARS BLANKSASNULL COMPUPDATE ON
# After write is completed run the following query
postActions: GRANT SELECT ON TABLE table TO GROUP group
maxStringSize: 1000
- dataFrameName: table1
outputType: Segment
outputOptions:
eventType: track
eventName: event
# Since segment limits the number of events per second, we need to sleep and write in small enough batches
sleep: 1000
batchSize: 30
- dataFrameName: table1
outputType: Elasticsearch
outputOptions:
# The index name to write to
resource: index1
# type, default is _doc
indexType:
# The saveMode for the write operation
saveMode: Append
# The write Operation to preform (index/create/upsert/update)
writeOperation: Upsert
# The mappingId by which to update/upsert a document by (not mandatory given a document Id exists)
mappingId: myId
# Enables dynamic indexing by adding a timestamp suffix
addTimestampToIndex: True
# you can add below any option supported by "Elasticsearch for Apache Hadoop"
extraOptions:
es.index.auto.create: false
- outputType: Hudi
# Optional: If enabled, lag between current timestamp and max dataframe's time column (configurable) will be reported.
reportLag: true
# Optional: Column name that will be used for calculating the lag, supported types: (Long, Timestamp, Integer), must be provided if "reportLag" is enabled.
reportLagTimeColumn: Option[String]
# Optional: Represents time durations of the "reportLagTimeColumn" (Long or Integer), at a given unit of granularity, optional values: [MILLISECONDS (Default), SECONDS]
reportLagTimeColumnUnits: Option[String]
outputOptions:
path: path
# The key to use for upserts
keyColumn: userkey
# This will be used to determine which row should prevail (newer timestamps will win)
timeColumn: ts
# Partition column - note that hudi support a single column only, so if you require multiple levels of partitioning you need to add / to your column values
partitionBy: date
# Hive table to save the results to
tableName: test_table
# Optional: Table name that will be used for register hudi metrics
hudiTableName: test_table_hudi
# Optional: Mapping of the above partitions to hive (for example if above is yyyy/MM/dd than the mapping should be year,month,day) - (default will be split by / of the partitionBy column)
hivePartitions: year,month,day
# Optional: Save mode to use (default to Append)
saveMode: Overwrite
# Optional: extra options (http://hudi.incubator.apache.org/configurations.html)
extraOptions:
option1: value1