001 /*
002 * SPDX-License-Identifier: Apache-2.0
003 *
004 * Copyright 2020-2022 Agorapulse.
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 * https://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package com.agorapulse.micronaut.bigquery;
019
020 import com.google.common.base.CaseFormat;
021 import io.micronaut.core.beans.BeanIntrospection;
022 import io.micronaut.core.beans.BeanIntrospector;
023 import io.micronaut.core.beans.BeanProperty;
024 import io.reactivex.Flowable;
025
026 import java.util.ArrayList;
027 import java.util.Collection;
028 import java.util.Collections;
029 import java.util.LinkedHashMap;
030 import java.util.List;
031 import java.util.Map;
032 import java.util.Optional;
033 import java.util.function.Function;
034 import java.util.stream.Collectors;
035
036 /**
037 * BigQuery service is used to execute SQL statements against BigQuery data warehouse.
038 */
039 public interface BigQueryService {
040
041 /**
042 * Runs a SQL query against the BigQuery warehouse and map the results into an object.
043 * @param namedParameters the named parameters for the SQL query
044 * @param sql the SQL query, must contain <code>@</code> as named parameter prefix
045 * @param builder the function mapping the result into an object
046 * @param <T> type of the result objects
047 * @return the flowable of objects mapped using the builder
048 */
049 <T> Flowable<T> query(Map<String, ?> namedParameters, String sql, Function<RowResult, T> builder);
050
051 /**
052 * Runs a SQL statement against the BigQuery warehouse.
053 * @param namedParameters the named parameters for the SQL statement
054 * @param sql the SQL statement, must contain <code>@</code> as named parameter prefix
055 */
056 void execute(Map<String, ?> namedParameters, String sql);
057
058 /**
059 * Run a SQL query against the BigQuery warehouse and map the results into an single object if present.
060 * @param sql the SQL query, must contain <code>@</code> as named parameter prefix
061 * @param builder the function mapping the result into an object
062 * @param <T> type of the result objects
063 * @return the optional holding the first returned result or an empty optinal
064 */
065 default <T> Optional<T> querySingle(String sql, Function<RowResult, T> builder) {
066 return querySingle(Collections.emptyMap(), sql, builder);
067 }
068
069 /**
070 * Run a SQL query against the BigQuery warehouse and map the results into an single object if present.
071 * @param namedParameters the named parameters for the SQL query
072 * @param sql the SQL query, must contain <code>@</code> as named parameter prefix
073 * @param builder the function mapping the result into an object
074 * @param <T> type of the result objects
075 * @return the optional holding the first returned result or an empty optinal
076 */
077 default <T> Optional<T> querySingle(Map<String, ?> namedParameters, String sql, Function<RowResult, T> builder) {
078 return Optional.ofNullable(query(namedParameters, sql, builder).blockingFirst(null));
079 }
080
081 /**
082 * Run a SQL query against the BigQuery warehouse and map the results into an object.
083 * @param sql the SQL query, must contain <code>@</code> as named parameter prefix
084 * @param builder the function mapping the result into an object
085 * @param <T> type of the result objects
086 * @return the flowable of objects mapped using the builder
087 */
088 default <T> Flowable<T> query(String sql, Function<RowResult, T> builder) {
089 return query(Collections.emptyMap(), sql, builder);
090 }
091
092 /**
093 * Runs a SQL statement against the BigQuery warehouse
094 * @param sql the SQL statement, must contain <code>@</code> as named parameter prefix
095 */
096 default void execute(String sql) {
097 execute(Collections.emptyMap(), sql);
098 }
099
100 /**
101 * Inserts the object into the database.
102 *
103 * @param object the object to be inserted
104 * @param dataset the name of the dataset
105 * @param table the name of the table
106 * @param <T> the type of the inserted object
107 * @return the very same object as has been passed into this method
108 */
109 default <T> T insert(T object, String dataset, String table) {
110 ParameterizedSql insert = generateInsert(object, dataset, table);
111 execute(insert.getNamedParameters(), insert.getSql());
112 return object;
113 }
114
115 /**
116 * Generates the insert statement with the named parameters prepared.
117 *
118 * @param object the object to be inserted
119 * @param dataset the name of the dataset
120 * @param table the name of the table
121 * @param <T> the type of the inserted object
122 * @return the sql and named parameters for the insertion of the given object
123 */
124 @SuppressWarnings("unchecked")
125 default <T> ParameterizedSql generateInsert(T object, final String dataset, String table) {
126 BeanIntrospection<T> introspection = BeanIntrospector.SHARED.getIntrospection((Class<T>) object.getClass());
127
128 Collection<BeanProperty<T, Object>> fields = introspection.getBeanProperties();
129
130 final List<String> keys = new ArrayList<>();
131 Map<String, Object> values = new LinkedHashMap<>();
132 for (BeanProperty<T, Object> field : fields) {
133 Object value = field.get(object);
134 String formattedName = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, field.getName());
135
136 if (value != null) {
137 keys.add(formattedName);
138 values.put(formattedName, convertIfNecessary(value));
139 }
140 }
141
142 String builder = String.format(
143 "insert into %s.%s (%s) values (%s)",
144 dataset,
145 table,
146 String.join(", ", keys),
147 keys.stream().map(k -> "@" + k).collect(Collectors.joining(", "))
148 );
149
150 return ParameterizedSql.from(values, builder);
151 }
152
153 /**
154 * Converts the object value to the value suitable for the underlying database if necessary.
155 * @param object the object which might need conversion
156 * @return either the original object or a representation of the object which is suitable for the database
157 */
158 default Object convertIfNecessary(Object object) {
159 if (object instanceof Enum) {
160 return object.toString();
161 }
162
163 return object;
164 }
165
166 }
|